py_hamt

 1from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore
 2from .hamt import HAMT, blake3_hashfn
 3from .hamt_to_sharded_converter import convert_hamt_to_sharded, sharded_converter_cli
 4from .sharded_zarr_store import ShardedZarrStore
 5from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS
 6from .zarr_hamt_store import ZarrHAMTStore
 7
 8__all__ = [
 9    "blake3_hashfn",
10    "HAMT",
11    "ContentAddressedStore",
12    "InMemoryCAS",
13    "KuboCAS",
14    "ZarrHAMTStore",
15    "SimpleEncryptedZarrHAMTStore",
16    "ShardedZarrStore",
17    "convert_hamt_to_sharded",
18    "sharded_converter_cli",
19]
def blake3_hashfn(input_bytes: bytes) -> bytes:
53def blake3_hashfn(input_bytes: bytes) -> bytes:
54    """
55    This is the default blake3 hash function used for the `HAMT`, with a 32 byte hash size.
56
57    """
58    # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify
59    digest: bytes = b3.digest(input_bytes, size=32)
60    raw_bytes: bytes = b3.unwrap(digest)
61    return raw_bytes

This is the default blake3 hash function used for the HAMT, with a 32 byte hash size.

class HAMT:
289class HAMT:
290    """
291    An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.
292
293    Use this to store arbitrarily large key-value mappings in your CAS of choice.
294
295    For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.
296
297    When in read-only mode, the HAMT is both async and thread safe.
298
299    #### A note about memory management, read+write and read-only modes
300    The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.
301
302    Note that in read+write, the real root node id IS NOT VALID. You should call `make_read_only()` to convert to read only mode and then read `root_node_id`.
303
304    These optimizations also trade off performance for memory use. Use `cache_size` to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use `cache_vacate` if you are over your memory limits.
305
306    #### IPFS HAMT Sample Code
307    ```python
308    kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
309    hamt = await HAMT.build(cas=kubo_cas)
310    await hamt.set("foo", "bar")
311    assert (await hamt.get("foo")) == "bar"
312    await hamt.make_read_only()
313    cid = hamt.root_node_id # our root node CID
314    print(cid)
315    ```
316    """
317
318    def __init__(
319        self,
320        cas: ContentAddressedStore,
321        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
322        root_node_id: IPLDKind | None = None,
323        read_only: bool = False,
324        max_bucket_size: int = 4,
325        values_are_bytes: bool = False,
326    ):
327        """
328        Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
329        """
330
331        self.cas: ContentAddressedStore = cas
332        """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS."""
333
334        self.hash_fn: Callable[[bytes], bytes] = hash_fn
335        """
336        This is the hash function used to place a key-value within the HAMT.
337
338        To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
339
340        It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
341
342        Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
343        """
344
345        self.lock: asyncio.Lock = asyncio.Lock()
346        """@private"""
347
348        self.values_are_bytes: bool = values_are_bytes
349        """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
350
351        This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
352        """
353
354        if max_bucket_size < 1:
355            raise ValueError("Bucket size maximum must be a positive integer")
356        self.max_bucket_size: int = max_bucket_size
357        """
358        This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
359
360        This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
361
362        This must be a positive integer with a minimum of 1.
363        """
364
365        self.root_node_id: IPLDKind = root_node_id
366        """
367        This is type IPLDKind but the documentation generator pdoc mangles it a bit.
368
369        Read from this only when in read mode to get something valid!
370        """
371
372        self.read_only: bool = read_only
373        """Clients should NOT modify this.
374
375        This is here for checking whether the HAMT is in read only or read/write mode.
376
377        The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
378        """
379        self.node_store: NodeStore
380        """@private"""
381        if read_only:
382            self.node_store = ReadCacheStore(self)
383        else:
384            self.node_store = InMemoryTreeStore(self)
385
386    @classmethod
387    async def build(cls, *args: Any, **kwargs: Any) -> "HAMT":
388        """
389        Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
390
391        This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
392        """
393        hamt = cls(*args, **kwargs)
394        if hamt.root_node_id is None:
395            hamt.root_node_id = await hamt.node_store.save(None, Node())
396        return hamt
397
398    # This is typically a massive blocking operation, you dont want to be running this concurrently with a bunch of other operations, so it's ok to have it not be async
399    async def make_read_only(self) -> None:
400        """
401        Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
402
403        In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
404        """
405        async with self.lock:
406            inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store)
407            await inmemory_tree.vacate()
408
409            self.read_only = True
410            self.node_store = ReadCacheStore(self)
411
412    async def enable_write(self) -> None:
413        """
414        Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`.
415        """
416        async with self.lock:
417            # The read cache has no writes that need to be sent upstream so we can remove it without vacating
418            self.read_only = False
419            self.node_store = InMemoryTreeStore(self)
420
421    async def cache_size(self) -> int:
422        """
423        Returns the memory used by some internal performance optimization tools in bytes.
424
425        This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
426
427        Be warned that this may take a while to run for large HAMTs.
428
429        For more on memory management, see the `HAMT` class documentation.
430        """
431        if self.read_only:
432            return self.node_store.size()
433        async with self.lock:
434            return self.node_store.size()
435
436    async def cache_vacate(self) -> None:
437        """
438        Vacate and completely empty out the internal read/write cache.
439
440        Be warned that this may take a while if there have been a lot of write operations.
441
442        For more on memory management, see the `HAMT` class documentation.
443        """
444        if self.read_only:
445            await self.node_store.vacate()
446        else:
447            async with self.lock:
448                await self.node_store.vacate()
449
450    async def _reserialize_and_link(
451        self, node_stack: list[tuple[IPLDKind, Node]]
452    ) -> None:
453        """
454        This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store
455        Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack
456        Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python
457        If a node ends up being empty, then it is deleted entirely, unless it is the root node
458        Modifies in place
459        """
460        # iterate in the reverse direction, this range goes from n-1 to 0, from the bottommost tree node to the root
461        for stack_index in range(len(node_stack) - 1, -1, -1):
462            old_id, node = node_stack[stack_index]
463
464            # If this node is empty, and it's not the root node, then we can delete it entirely from the list
465            is_root: bool = stack_index == 0
466            if node.is_empty() and not is_root:
467                # Unlink from the rest of the tree
468                _, prev_node = node_stack[stack_index - 1]
469                # When removing links, don't worry about two nodes having the same link since all nodes are guaranteed to be different by the removal of empty nodes after every single operation
470                for link_index in prev_node.iter_link_indices():
471                    link = prev_node.get_link(link_index)
472                    if link == old_id:
473                        # Delete the link by making it an empty bucket
474                        prev_node.data[link_index] = {}
475                        break
476
477                # Remove from our stack, continue reserializing up the tree
478                node_stack.pop(stack_index)
479                continue
480
481            # If not an empty node, just reserialize like normal and replace this one
482            new_store_id: IPLDKind = await self.node_store.save(old_id, node)
483            node_stack[stack_index] = (new_store_id, node)
484
485            # If this is not the last i.e. root node, we need to change the linking of the node prior in the list since we just reserialized
486            if not is_root:
487                _, prev_node = node_stack[stack_index - 1]
488                prev_node.replace_link(old_id, new_store_id)
489
490    # automatically skip encoding if the value provided is of the bytes variety
491    async def set(self, key: str, val: IPLDKind) -> None:
492        """Write a key-value mapping."""
493        if self.read_only:
494            raise Exception("Cannot call set on a read only HAMT")
495
496        data: bytes
497        if self.values_are_bytes:
498            data = cast(
499                bytes, val
500            )  # let users get an exception if they pass in a non bytes when they want to skip encoding
501        else:
502            data = dag_cbor.encode(val)
503
504        pointer: IPLDKind = await self.cas.save(data, codec="raw")
505        await self._set_pointer(key, pointer)
506
507    async def _set_pointer(self, key: str, val_ptr: IPLDKind) -> None:
508        async with self.lock:
509            node_stack: list[tuple[IPLDKind, Node]] = []
510            root_node: Node = await self.node_store.load(self.root_node_id)
511            node_stack.append((self.root_node_id, root_node))
512
513            # FIFO queue to keep track of all the KVs we need to insert
514            # This is needed if any buckets overflow and so we need to reinsert all those KVs
515            kvs_queue: list[tuple[str, IPLDKind]] = []
516            kvs_queue.append((key, val_ptr))
517
518            while len(kvs_queue) > 0:
519                _, top_node = node_stack[-1]
520                curr_key, curr_val_ptr = kvs_queue[0]
521
522                raw_hash: bytes = self.hash_fn(curr_key.encode())
523                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
524
525                item = top_node.data[map_key]
526                if isinstance(item, list):
527                    next_node_id: IPLDKind = item[0]
528                    next_node: Node = await self.node_store.load(next_node_id)
529                    node_stack.append((next_node_id, next_node))
530                elif isinstance(item, dict):
531                    bucket: dict[str, IPLDKind] = item
532
533                    # If this bucket already has this same key, or has space, just rewrite the value and then go work on the others in the queue
534                    if curr_key in bucket or len(bucket) < self.max_bucket_size:
535                        bucket[curr_key] = curr_val_ptr
536                        kvs_queue.pop(0)
537                        continue
538
539                    # The current key is not in the bucket and the bucket is too full, so empty KVs from the bucket and restart insertion
540                    for k in bucket:
541                        v_ptr = bucket[k]
542                        kvs_queue.append((k, v_ptr))
543
544                    # Create a new link to a new node so that we can reflow these KVs into a new subtree
545                    new_node = Node()
546                    new_node_id: IPLDKind = await self.node_store.save(None, new_node)
547                    link: list[IPLDKind] = [new_node_id]
548                    top_node.data[map_key] = link
549
550            # Finally, reserialize and fix all links, deleting empty nodes as needed
551            await self._reserialize_and_link(node_stack)
552            self.root_node_id = node_stack[0][0]
553
554    async def delete(self, key: str) -> None:
555        """Delete a key-value mapping."""
556
557        # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo
558        if self.read_only:
559            raise Exception("Cannot call delete on a read only HAMT")
560
561        async with self.lock:
562            raw_hash: bytes = self.hash_fn(key.encode())
563
564            node_stack: list[tuple[IPLDKind, Node]] = []
565            root_node: Node = await self.node_store.load(self.root_node_id)
566            node_stack.append((self.root_node_id, root_node))
567
568            created_change: bool = False
569            while True:
570                _, top_node = node_stack[-1]
571                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
572
573                item = top_node.data[map_key]
574                if isinstance(item, dict):
575                    bucket = item
576                    if key in bucket:
577                        del bucket[key]
578                        created_change = True
579                    # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError
580                    break
581                elif isinstance(item, list):
582                    link: IPLDKind = item[0]
583                    next_node: Node = await self.node_store.load(link)
584                    node_stack.append((link, next_node))
585
586            # Finally, reserialize and fix all links, deleting empty nodes as needed
587            if created_change:
588                await self._reserialize_and_link(node_stack)
589                self.root_node_id = node_stack[0][0]
590            else:
591                # If we didn't make a change, then this key must not exist within the HAMT
592                raise KeyError
593
594    async def get(
595        self,
596        key: str,
597        offset: Optional[int] = None,
598        length: Optional[int] = None,
599        suffix: Optional[int] = None,
600    ) -> IPLDKind:
601        """Get a value."""
602        pointer: IPLDKind = await self.get_pointer(key)
603        data: bytes = await self.cas.load(
604            pointer, offset=offset, length=length, suffix=suffix
605        )
606        if self.values_are_bytes:
607            return data
608        else:
609            return dag_cbor.decode(data)
610
611    async def get_pointer(self, key: str) -> IPLDKind:
612        """
613        Get a store ID that points to the value for this key.
614
615        This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example.
616        """
617        # If read only, no need to acquire a lock
618        pointer: IPLDKind
619        if self.read_only:
620            pointer = await self._get_pointer(key)
621        else:
622            async with self.lock:
623                pointer = await self._get_pointer(key)
624
625        return pointer
626
627    # Callers MUST handle acquiring a lock
628    async def _get_pointer(self, key: str) -> IPLDKind:
629        raw_hash: bytes = self.hash_fn(key.encode())
630
631        current_id: IPLDKind = self.root_node_id
632        current_depth: int = 0
633
634        # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value of IPLDKind
635        result_ptr: IPLDKind = None
636        found_a_result: bool = False
637        while True:
638            top_id: IPLDKind = current_id
639            top_node: Node = await self.node_store.load(top_id)
640            map_key: int = extract_bits(raw_hash, current_depth, 8)
641
642            # Check if this key is in one of the buckets
643            item = top_node.data[map_key]
644            if isinstance(item, dict):
645                bucket = item
646                if key in bucket:
647                    result_ptr = bucket[key]
648                    found_a_result = True
649                    break
650
651            if isinstance(item, list):
652                link: IPLDKind = item[0]
653                current_id = link
654                current_depth += 1
655                continue
656
657            # Nowhere left to go, stop walking down the tree
658            break
659
660        if not found_a_result:
661            raise KeyError
662
663        return result_ptr
664
665    # Callers MUST handle locking or not on their own
666    async def _iter_nodes(self) -> AsyncIterator[tuple[IPLDKind, Node]]:
667        node_id_stack: list[IPLDKind] = [self.root_node_id]
668        while len(node_id_stack) > 0:
669            top_id: IPLDKind = node_id_stack.pop()
670            node: Node = await self.node_store.load(top_id)
671            yield (top_id, node)
672            node_id_stack.extend(list(node.iter_links()))
673
674    async def keys(self) -> AsyncIterator[str]:
675        """
676        AsyncIterator returning all keys in the HAMT.
677
678        If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
679
680        When the HAMT is in read only mode however, this can be run concurrently with get operations.
681        """
682        if self.read_only:
683            async for k in self._keys_no_locking():
684                yield k
685        else:
686            async with self.lock:
687                async for k in self._keys_no_locking():
688                    yield k
689
690    async def _keys_no_locking(self) -> AsyncIterator[str]:
691        async for _, node in self._iter_nodes():
692            for bucket in node.iter_buckets():
693                for key in bucket:
694                    yield key
695
696    async def len(self) -> int:
697        """
698        Return the number of key value mappings in this HAMT.
699
700        When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
701        """
702        count: int = 0
703        async for _ in self.keys():
704            count += 1
705
706        return count

An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.

Use this to store arbitrarily large key-value mappings in your CAS of choice.

For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.

When in read-only mode, the HAMT is both async and thread safe.

A note about memory management, read+write and read-only modes

The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.

Note that in read+write, the real root node id IS NOT VALID. You should call make_read_only() to convert to read only mode and then read root_node_id.

These optimizations also trade off performance for memory use. Use cache_size to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use cache_vacate if you are over your memory limits.

IPFS HAMT Sample Code

kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
hamt = await HAMT.build(cas=kubo_cas)
await hamt.set("foo", "bar")
assert (await hamt.get("foo")) == "bar"
await hamt.make_read_only()
cid = hamt.root_node_id # our root node CID
print(cid)
HAMT( cas: ContentAddressedStore, hash_fn: Callable[[bytes], bytes] = <function blake3_hashfn>, root_node_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]] = None, read_only: bool = False, max_bucket_size: int = 4, values_are_bytes: bool = False)
318    def __init__(
319        self,
320        cas: ContentAddressedStore,
321        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
322        root_node_id: IPLDKind | None = None,
323        read_only: bool = False,
324        max_bucket_size: int = 4,
325        values_are_bytes: bool = False,
326    ):
327        """
328        Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
329        """
330
331        self.cas: ContentAddressedStore = cas
332        """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS."""
333
334        self.hash_fn: Callable[[bytes], bytes] = hash_fn
335        """
336        This is the hash function used to place a key-value within the HAMT.
337
338        To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
339
340        It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
341
342        Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
343        """
344
345        self.lock: asyncio.Lock = asyncio.Lock()
346        """@private"""
347
348        self.values_are_bytes: bool = values_are_bytes
349        """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
350
351        This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
352        """
353
354        if max_bucket_size < 1:
355            raise ValueError("Bucket size maximum must be a positive integer")
356        self.max_bucket_size: int = max_bucket_size
357        """
358        This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
359
360        This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
361
362        This must be a positive integer with a minimum of 1.
363        """
364
365        self.root_node_id: IPLDKind = root_node_id
366        """
367        This is type IPLDKind but the documentation generator pdoc mangles it a bit.
368
369        Read from this only when in read mode to get something valid!
370        """
371
372        self.read_only: bool = read_only
373        """Clients should NOT modify this.
374
375        This is here for checking whether the HAMT is in read only or read/write mode.
376
377        The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
378        """
379        self.node_store: NodeStore
380        """@private"""
381        if read_only:
382            self.node_store = ReadCacheStore(self)
383        else:
384            self.node_store = InMemoryTreeStore(self)

Use build if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.

The backing storage system. py-hamt provides an implementation KuboCAS for IPFS.

hash_fn: Callable[[bytes], bytes]

This is the hash function used to place a key-value within the HAMT.

To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.

It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.

Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.

values_are_bytes: bool

Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.

This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.

max_bucket_size: int

This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.

This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.

This must be a positive integer with a minimum of 1.

root_node_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]

This is type IPLDKind but the documentation generator pdoc mangles it a bit.

Read from this only when in read mode to get something valid!

read_only: bool

Clients should NOT modify this.

This is here for checking whether the HAMT is in read only or read/write mode.

The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.

@classmethod
async def build(cls, *args: Any, **kwargs: Any) -> HAMT:
386    @classmethod
387    async def build(cls, *args: Any, **kwargs: Any) -> "HAMT":
388        """
389        Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
390
391        This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
392        """
393        hamt = cls(*args, **kwargs)
394        if hamt.root_node_id is None:
395            hamt.root_node_id = await hamt.node_store.save(None, Node())
396        return hamt

Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as __init__. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.

This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.

async def make_read_only(self) -> None:
399    async def make_read_only(self) -> None:
400        """
401        Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
402
403        In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
404        """
405        async with self.lock:
406            inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store)
407            await inmemory_tree.vacate()
408
409            self.read_only = True
410            self.node_store = ReadCacheStore(self)

Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.

In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.

async def enable_write(self) -> None:
412    async def enable_write(self) -> None:
413        """
414        Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`.
415        """
416        async with self.lock:
417            # The read cache has no writes that need to be sent upstream so we can remove it without vacating
418            self.read_only = False
419            self.node_store = InMemoryTreeStore(self)

Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use make_read_only.

async def cache_size(self) -> int:
421    async def cache_size(self) -> int:
422        """
423        Returns the memory used by some internal performance optimization tools in bytes.
424
425        This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
426
427        Be warned that this may take a while to run for large HAMTs.
428
429        For more on memory management, see the `HAMT` class documentation.
430        """
431        if self.read_only:
432            return self.node_store.size()
433        async with self.lock:
434            return self.node_store.size()

Returns the memory used by some internal performance optimization tools in bytes.

This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.

Be warned that this may take a while to run for large HAMTs.

For more on memory management, see the HAMT class documentation.

async def cache_vacate(self) -> None:
436    async def cache_vacate(self) -> None:
437        """
438        Vacate and completely empty out the internal read/write cache.
439
440        Be warned that this may take a while if there have been a lot of write operations.
441
442        For more on memory management, see the `HAMT` class documentation.
443        """
444        if self.read_only:
445            await self.node_store.vacate()
446        else:
447            async with self.lock:
448                await self.node_store.vacate()

Vacate and completely empty out the internal read/write cache.

Be warned that this may take a while if there have been a lot of write operations.

For more on memory management, see the HAMT class documentation.

async def set( self, key: str, val: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]) -> None:
491    async def set(self, key: str, val: IPLDKind) -> None:
492        """Write a key-value mapping."""
493        if self.read_only:
494            raise Exception("Cannot call set on a read only HAMT")
495
496        data: bytes
497        if self.values_are_bytes:
498            data = cast(
499                bytes, val
500            )  # let users get an exception if they pass in a non bytes when they want to skip encoding
501        else:
502            data = dag_cbor.encode(val)
503
504        pointer: IPLDKind = await self.cas.save(data, codec="raw")
505        await self._set_pointer(key, pointer)

Write a key-value mapping.

async def delete(self, key: str) -> None:
554    async def delete(self, key: str) -> None:
555        """Delete a key-value mapping."""
556
557        # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo
558        if self.read_only:
559            raise Exception("Cannot call delete on a read only HAMT")
560
561        async with self.lock:
562            raw_hash: bytes = self.hash_fn(key.encode())
563
564            node_stack: list[tuple[IPLDKind, Node]] = []
565            root_node: Node = await self.node_store.load(self.root_node_id)
566            node_stack.append((self.root_node_id, root_node))
567
568            created_change: bool = False
569            while True:
570                _, top_node = node_stack[-1]
571                map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8)
572
573                item = top_node.data[map_key]
574                if isinstance(item, dict):
575                    bucket = item
576                    if key in bucket:
577                        del bucket[key]
578                        created_change = True
579                    # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError
580                    break
581                elif isinstance(item, list):
582                    link: IPLDKind = item[0]
583                    next_node: Node = await self.node_store.load(link)
584                    node_stack.append((link, next_node))
585
586            # Finally, reserialize and fix all links, deleting empty nodes as needed
587            if created_change:
588                await self._reserialize_and_link(node_stack)
589                self.root_node_id = node_stack[0][0]
590            else:
591                # If we didn't make a change, then this key must not exist within the HAMT
592                raise KeyError

Delete a key-value mapping.

async def get( self, key: str, offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
594    async def get(
595        self,
596        key: str,
597        offset: Optional[int] = None,
598        length: Optional[int] = None,
599        suffix: Optional[int] = None,
600    ) -> IPLDKind:
601        """Get a value."""
602        pointer: IPLDKind = await self.get_pointer(key)
603        data: bytes = await self.cas.load(
604            pointer, offset=offset, length=length, suffix=suffix
605        )
606        if self.values_are_bytes:
607            return data
608        else:
609            return dag_cbor.decode(data)

Get a value.

async def get_pointer( self, key: str) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
611    async def get_pointer(self, key: str) -> IPLDKind:
612        """
613        Get a store ID that points to the value for this key.
614
615        This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example.
616        """
617        # If read only, no need to acquire a lock
618        pointer: IPLDKind
619        if self.read_only:
620            pointer = await self._get_pointer(key)
621        else:
622            async with self.lock:
623                pointer = await self._get_pointer(key)
624
625        return pointer

Get a store ID that points to the value for this key.

This is useful for some applications that want to implement a read cache. Due to the restrictions of ContentAddressedStore on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in ZarrHAMTStore for example.

async def keys(self) -> AsyncIterator[str]:
674    async def keys(self) -> AsyncIterator[str]:
675        """
676        AsyncIterator returning all keys in the HAMT.
677
678        If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
679
680        When the HAMT is in read only mode however, this can be run concurrently with get operations.
681        """
682        if self.read_only:
683            async for k in self._keys_no_locking():
684                yield k
685        else:
686            async with self.lock:
687                async for k in self._keys_no_locking():
688                    yield k

AsyncIterator returning all keys in the HAMT.

If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.

When the HAMT is in read only mode however, this can be run concurrently with get operations.

async def len(self) -> int:
696    async def len(self) -> int:
697        """
698        Return the number of key value mappings in this HAMT.
699
700        When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
701        """
702        count: int = 0
703        async for _ in self.keys():
704            count += 1
705
706        return count

Return the number of key value mappings in this HAMT.

When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.

class ContentAddressedStore(abc.ABC):
14class ContentAddressedStore(ABC):
15    """
16    Abstract class that represents a content addressed storage that the `HAMT` can use for keeping data.
17
18    Note that the return type of save and input to load is really type `IPLDKind`, but the documentation generator pdoc mangles it unfortunately.
19
20    #### A note on the IPLDKind return types
21    Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:
22    1. No lists or dicts, since python does not classify these as immutable.
23    2. No `None` values since this is used in HAMT's `__init__` to indicate that an empty HAMT needs to be initialized.
24    """
25
26    CodecInput = Literal["raw", "dag-cbor"]
27
28    @abstractmethod
29    async def save(self, data: bytes, codec: CodecInput) -> IPLDKind:
30        """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
31
32        `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
33        """
34
35    @abstractmethod
36    async def load(
37        self,
38        id: IPLDKind,
39        offset: Optional[int] = None,
40        length: Optional[int] = None,
41        suffix: Optional[int] = None,
42    ) -> bytes:
43        """Retrieve data."""
44
45    async def pin_cid(self, id: IPLDKind, target_rpc: str) -> None:
46        """Pin a CID in the storage."""
47        pass  # pragma: no cover
48
49    async def unpin_cid(self, id: IPLDKind, target_rpc: str) -> None:
50        """Unpin a CID in the storage."""
51        pass  # pragma: no cover
52
53    async def pin_update(
54        self, old_id: IPLDKind, new_id: IPLDKind, target_rpc: str
55    ) -> None:
56        """Update the pinned CID in the storage."""
57        pass  # pragma: no cover
58
59    async def pin_ls(self, target_rpc: str) -> list[Dict[str, Any]]:
60        """List all pinned CIDs in the storage."""
61        return []  # pragma: no cover

Abstract class that represents a content addressed storage that the HAMT can use for keeping data.

Note that the return type of save and input to load is really type IPLDKind, but the documentation generator pdoc mangles it unfortunately.

A note on the IPLDKind return types

Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:

  1. No lists or dicts, since python does not classify these as immutable.
  2. No None values since this is used in HAMT's __init__ to indicate that an empty HAMT needs to be initialized.
CodecInput = typing.Literal['raw', 'dag-cbor']
@abstractmethod
async def save( self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
28    @abstractmethod
29    async def save(self, data: bytes, codec: CodecInput) -> IPLDKind:
30        """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
31
32        `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
33        """

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

@abstractmethod
async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> bytes:
35    @abstractmethod
36    async def load(
37        self,
38        id: IPLDKind,
39        offset: Optional[int] = None,
40        length: Optional[int] = None,
41        suffix: Optional[int] = None,
42    ) -> bytes:
43        """Retrieve data."""

Retrieve data.

async def pin_cid( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], target_rpc: str) -> None:
45    async def pin_cid(self, id: IPLDKind, target_rpc: str) -> None:
46        """Pin a CID in the storage."""
47        pass  # pragma: no cover

Pin a CID in the storage.

async def unpin_cid( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], target_rpc: str) -> None:
49    async def unpin_cid(self, id: IPLDKind, target_rpc: str) -> None:
50        """Unpin a CID in the storage."""
51        pass  # pragma: no cover

Unpin a CID in the storage.

async def pin_update( self, old_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], new_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], target_rpc: str) -> None:
53    async def pin_update(
54        self, old_id: IPLDKind, new_id: IPLDKind, target_rpc: str
55    ) -> None:
56        """Update the pinned CID in the storage."""
57        pass  # pragma: no cover

Update the pinned CID in the storage.

async def pin_ls(self, target_rpc: str) -> list[typing.Dict[str, typing.Any]]:
59    async def pin_ls(self, target_rpc: str) -> list[Dict[str, Any]]:
60        """List all pinned CIDs in the storage."""
61        return []  # pragma: no cover

List all pinned CIDs in the storage.

class InMemoryCAS(py_hamt.ContentAddressedStore):
 64class InMemoryCAS(ContentAddressedStore):
 65    """Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that `save` returns and `load` takes in."""
 66
 67    store: dict[bytes, bytes]
 68    hash_alg: Multihash
 69
 70    def __init__(self):
 71        self.store = dict()
 72        self.hash_alg = multihash.get("blake3")
 73
 74    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes:
 75        hash: bytes = self.hash_alg.digest(data, size=32)
 76        self.store[hash] = data
 77        return hash
 78
 79    async def load(
 80        self,
 81        id: IPLDKind,
 82        offset: Optional[int] = None,
 83        length: Optional[int] = None,
 84        suffix: Optional[int] = None,
 85    ) -> bytes:
 86        """
 87        `ContentAddressedStore` allows any IPLD scalar key.  For the in-memory
 88        backend we *require* a `bytes` hash; anything else is rejected at run
 89        time. In OO type-checking, a subclass may widen (make more general) argument types,
 90        but it must never narrow them; otherwise callers that expect the base-class contract can break.
 91        Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
 92        This is why we use `cast` here, to tell mypy that we know what we are doing.
 93        h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
 94        """
 95        key = cast(bytes, id)
 96        if not isinstance(key, (bytes, bytearray)):  # defensive guard
 97            raise TypeError(
 98                f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}"
 99            )
100        data: bytes
101        try:
102            data = self.store[key]
103        except KeyError as exc:
104            raise KeyError("Object not found in in-memory store") from exc
105
106        if offset is not None:
107            start = offset
108            if length is not None:
109                end = start + length
110                return data[start:end]
111            else:
112                return data[start:]
113        elif suffix is not None:  # If only length is given, assume start from 0
114            return data[-suffix:]
115        else:  # Full load
116            return data

Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that save returns and load takes in.

store: dict[bytes, bytes]
hash_alg: multiformats.multihash.Multihash
async def save(self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> bytes:
74    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes:
75        hash: bytes = self.hash_alg.digest(data, size=32)
76        self.store[hash] = data
77        return hash

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> bytes:
 79    async def load(
 80        self,
 81        id: IPLDKind,
 82        offset: Optional[int] = None,
 83        length: Optional[int] = None,
 84        suffix: Optional[int] = None,
 85    ) -> bytes:
 86        """
 87        `ContentAddressedStore` allows any IPLD scalar key.  For the in-memory
 88        backend we *require* a `bytes` hash; anything else is rejected at run
 89        time. In OO type-checking, a subclass may widen (make more general) argument types,
 90        but it must never narrow them; otherwise callers that expect the base-class contract can break.
 91        Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
 92        This is why we use `cast` here, to tell mypy that we know what we are doing.
 93        h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
 94        """
 95        key = cast(bytes, id)
 96        if not isinstance(key, (bytes, bytearray)):  # defensive guard
 97            raise TypeError(
 98                f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}"
 99            )
100        data: bytes
101        try:
102            data = self.store[key]
103        except KeyError as exc:
104            raise KeyError("Object not found in in-memory store") from exc
105
106        if offset is not None:
107            start = offset
108            if length is not None:
109                end = start + length
110                return data[start:end]
111            else:
112                return data[start:]
113        elif suffix is not None:  # If only length is given, assume start from 0
114            return data[-suffix:]
115        else:  # Full load
116            return data

ContentAddressedStore allows any IPLD scalar key. For the in-memory backend we require a bytes hash; anything else is rejected at run time. In OO type-checking, a subclass may widen (make more general) argument types, but it must never narrow them; otherwise callers that expect the base-class contract can break. Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. This is why we use cast here, to tell mypy that we know what we are doing. h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch

class KuboCAS(py_hamt.ContentAddressedStore):
119class KuboCAS(ContentAddressedStore):
120    """
121    Connects to an **IPFS Kubo** daemon.
122
123    The IDs in save and load are IPLD CIDs.
124
125    * **save()**  → RPC  (`/api/v0/add`)
126    * **load()**  → HTTP gateway  (`/ipfs/{cid}`)
127
128    `save` uses the RPC API and `load` uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.
129
130    ### Authentication / custom headers
131    You have two options:
132
133    1. **Bring your own `httpx.AsyncClient`**
134       Pass it via `client=...` — any default headers or auth
135       configured on that client are reused for **every** request.
136    2. **Let `KuboCAS` build the client** but pass
137       `headers=` *and*/or `auth=` kwargs; they are forwarded to the
138       internally–created `AsyncClient`.
139
140    ```python
141    import httpx
142    from py_hamt import KuboCAS
143
144    # Option 1: user-supplied client
145    client = httpx.AsyncClient(
146        headers={"Authorization": "Bearer <token>"},
147        auth=("user", "pass"),
148    )
149    cas = KuboCAS(client=client)
150
151    # Option 2: let KuboCAS create the client
152    cas = KuboCAS(
153        headers={"X-My-Header": "yes"},
154        auth=("user", "pass"),
155    )
156    ```
157
158    ### Parameters
159    - **hasher** (str): multihash name (defaults to *blake3*).
160    - **client** (`httpx.AsyncClient | None`): reuse an existing
161      client; if *None* KuboCAS will create one lazily.
162    - **headers** (dict[str, str] | None): default headers for the
163      internally-created client.
164    - **auth** (`tuple[str, str] | None`): authentication tuple (username, password)
165      for the internally-created client.
166    - **rpc_base_url / gateway_base_url** (str | None): override daemon
167      endpoints (defaults match the local daemon ports).
168    - **chunker** (str): chunking algorithm specification for Kubo's `add`
169      RPC. Accepted formats are `"size-<positive int>"`, `"rabin"`, or
170      `"rabin-<min>-<avg>-<max>"`.
171
172    ...
173    """
174
175    KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = "http://127.0.0.1:8080"
176    KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = "http://127.0.0.1:5001"
177
178    DAG_PB_MARKER: int = 0x70
179    """@private"""
180
181    # Take in a httpx client that can be reused across POSTs and GETs to a specific IPFS daemon
182    def __init__(
183        self,
184        hasher: str = "blake3",
185        client: httpx.AsyncClient | None = None,
186        rpc_base_url: str | None = None,
187        gateway_base_url: str | None = None,
188        concurrency: int = 32,
189        *,
190        headers: dict[str, str] | None = None,
191        auth: Tuple[str, str] | None = None,
192        pin_on_add: bool = False,
193        chunker: str = "size-1048576",
194        max_retries: int = 3,
195        initial_delay: float = 1.0,
196        backoff_factor: float = 2.0,
197    ):
198        """
199        If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
200
201        ### `httpx.AsyncClient` Management
202        If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()`
203        as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
204
205        If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below:
206        ```python
207        async with httpx.AsyncClient() as client, KuboCAS(
208            rpc_base_url=rpc_base_url,
209            gateway_base_url=gateway_base_url,
210            client=client,
211        ) as kubo_cas:
212            hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
213            zhs = ZarrHAMTStore(hamt)
214            # Use the KuboCAS instance as needed
215            # ...
216        ```
217        As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up.
218        ``` python
219        cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
220        # Use the KuboCAS instance as needed
221        # ...
222        await cas.aclose()  # Ensure resources are cleaned up
223        ```
224
225        ### Authenticated RPC/Gateway Access
226        Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in.
227        Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided.
228        If you do not need authentication, you can leave these parameters as `None`.
229
230        ### RPC and HTTP Gateway Base URLs
231        These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
232        """
233
234        self._owns_client: bool = False
235        self._closed: bool = True
236        self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {}
237        self._default_headers = headers
238        self._default_auth = auth
239
240        # Now, perform validation that might raise an exception
241        chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)"
242        if re.fullmatch(chunker_pattern, chunker) is None:
243            raise ValueError("Invalid chunker specification")
244        self.chunker: str = chunker
245
246        self.hasher: str = hasher
247        """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT."""
248
249        if rpc_base_url is None:
250            rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL  # pragma
251        if gateway_base_url is None:
252            gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL
253
254        if "/ipfs/" in gateway_base_url:
255            gateway_base_url = gateway_base_url.split("/ipfs/")[0]
256
257        # Standard gateway URL construction with proper path handling
258        if gateway_base_url.endswith("/"):
259            gateway_base_url = f"{gateway_base_url}ipfs/"
260        else:
261            gateway_base_url = f"{gateway_base_url}/ipfs/"
262
263        pin_string: str = "true" if pin_on_add else "false"
264        self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin={pin_string}"
265        """@private"""
266        self.gateway_base_url: str = gateway_base_url
267        """@private"""
268
269        if client is not None:
270            # A client was supplied by the user. We don't own it.
271            self._owns_client = False
272            self._client_per_loop = {asyncio.get_running_loop(): client}
273        else:
274            # No client supplied. We will own any clients we create.
275            self._owns_client = True
276            self._client_per_loop = {}
277
278        # store for later use by _loop_client()
279        self._default_headers = headers
280        self._default_auth = auth
281
282        self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
283        self._closed = False
284
285        # Validate retry parameters
286        if max_retries < 0:
287            raise ValueError("max_retries must be non-negative")
288        if initial_delay <= 0:
289            raise ValueError("initial_delay must be positive")
290        if backoff_factor < 1.0:
291            raise ValueError("backoff_factor must be >= 1.0 for exponential backoff")
292
293        self.max_retries = max_retries
294        self.initial_delay = initial_delay
295        self.backoff_factor = backoff_factor
296
297    # --------------------------------------------------------------------- #
298    # helper: get or create the client bound to the current running loop    #
299    # --------------------------------------------------------------------- #
300    def _loop_client(self) -> httpx.AsyncClient:
301        """Get or create a client for the current event loop.
302
303        If the instance was previously closed but owns its clients, a fresh
304        client mapping is lazily created on demand.  Users that supplied their
305        own ``httpx.AsyncClient`` still receive an error when the instance has
306        been closed, as we cannot safely recreate their client.
307        """
308        if self._closed:
309            if not self._owns_client:
310                raise RuntimeError("KuboCAS is closed; create a new instance")
311            # We previously closed all internally-owned clients. Reset the
312            # state so that new clients can be created lazily.
313            self._closed = False
314            self._client_per_loop = {}
315
316        loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
317        try:
318            return self._client_per_loop[loop]
319        except KeyError:
320            # Create a new client
321            client = httpx.AsyncClient(
322                timeout=60.0,
323                headers=self._default_headers,
324                auth=self._default_auth,
325                limits=httpx.Limits(max_connections=64, max_keepalive_connections=32),
326                # Uncomment when they finally support Robust HTTP/2 GOAWAY responses
327                # http2=True,
328            )
329            self._client_per_loop[loop] = client
330            return client
331
332    # --------------------------------------------------------------------- #
333    # graceful shutdown: close **all** clients we own                       #
334    # --------------------------------------------------------------------- #
335    async def aclose(self) -> None:
336        """
337        Closes all internally-created clients. Must be called from an async context.
338        """
339        if self._owns_client is False:  # external client → caller closes
340            return
341
342        # This method is async, so we can reliably await the async close method.
343        # The complex sync/async logic is handled by __del__.
344        for client in list(self._client_per_loop.values()):
345            if not client.is_closed:
346                try:
347                    await client.aclose()
348                except Exception:
349                    pass  # best-effort cleanup
350
351        self._client_per_loop.clear()
352        self._closed = True
353
354    # At this point, _client_per_loop should be empty or only contain
355    # clients from loops we haven't seen (which shouldn't happen in practice)
356    async def __aenter__(self) -> "KuboCAS":
357        return self
358
359    async def __aexit__(self, *exc: Any) -> None:
360        await self.aclose()
361
362    def __del__(self) -> None:
363        """Best-effort close for internally-created clients."""
364        if not hasattr(self, "_owns_client") or not hasattr(self, "_closed"):
365            return
366
367        if not self._owns_client or self._closed:
368            return
369
370        # Attempt proper cleanup if possible
371        try:
372            loop = asyncio.get_running_loop()
373        except RuntimeError:
374            # No running loop - can't do async cleanup
375            # Just clear the client references synchronously
376            if hasattr(self, "_client_per_loop"):
377                # We can't await client.aclose() without a loop,
378                # so just clear the references
379                self._client_per_loop.clear()
380                self._closed = True
381            return
382
383        # If we get here, we have a running loop
384        try:
385            if loop.is_running():
386                # Schedule cleanup in the existing loop
387                loop.create_task(self.aclose())
388            else:
389                # Loop exists but not running - try asyncio.run
390                coro = self.aclose()  # Create the coroutine
391                try:
392                    asyncio.run(coro)
393                except Exception:
394                    # If asyncio.run fails, we need to close the coroutine properly
395                    coro.close()  # This prevents the RuntimeWarning
396                    raise  # Re-raise to hit the outer except block
397        except Exception:
398            # If all else fails, just clear references
399            if hasattr(self, "_client_per_loop"):
400                self._client_per_loop.clear()
401                self._closed = True
402
403    # --------------------------------------------------------------------- #
404    # save() – now uses the per-loop client                                 #
405    # --------------------------------------------------------------------- #
406    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID:
407        async with self._sem:
408            files = {"file": data}
409            client = self._loop_client()
410            retry_count = 0
411
412            while retry_count <= self.max_retries:
413                try:
414                    response = await client.post(
415                        self.rpc_url, files=files, timeout=60.0
416                    )
417                    response.raise_for_status()
418                    cid_str: str = response.json()["Hash"]
419                    cid: CID = CID.decode(cid_str)
420                    if cid.codec.code != self.DAG_PB_MARKER:
421                        cid = cid.set(codec=codec)
422                    return cid
423
424                except (httpx.TimeoutException, httpx.RequestError) as e:
425                    retry_count += 1
426                    if retry_count > self.max_retries:
427                        raise httpx.TimeoutException(
428                            f"Failed to save data after {self.max_retries} retries: {str(e)}",
429                            request=e.request
430                            if isinstance(e, httpx.RequestError)
431                            else None,
432                        )
433
434                    # Calculate backoff delay
435                    delay = self.initial_delay * (
436                        self.backoff_factor ** (retry_count - 1)
437                    )
438                    # Add some jitter to prevent thundering herd
439                    jitter = delay * 0.1 * (random.random() - 0.5)
440                    await asyncio.sleep(delay + jitter)
441
442                except httpx.HTTPStatusError:
443                    # Re-raise non-timeout HTTP errors immediately
444                    raise
445        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover
446
447    async def load(
448        self,
449        id: IPLDKind,
450        offset: Optional[int] = None,
451        length: Optional[int] = None,
452        suffix: Optional[int] = None,
453    ) -> bytes:
454        """Load data from a CID using the IPFS gateway with optional Range requests."""
455        cid = cast(CID, id)
456        url: str = f"{self.gateway_base_url + str(cid)}"
457        headers: Dict[str, str] = {}
458
459        # Construct the Range header if required
460        if offset is not None:
461            start = offset
462            if length is not None:
463                # Standard HTTP Range: bytes=start-end (inclusive)
464                end = start + length - 1
465                headers["Range"] = f"bytes={start}-{end}"
466            else:
467                # Standard HTTP Range: bytes=start- (from start to end)
468                headers["Range"] = f"bytes={start}-"
469        elif suffix is not None:
470            # Standard HTTP Range: bytes=-N (last N bytes)
471            headers["Range"] = f"bytes=-{suffix}"
472
473        async with self._sem:  # Throttle gateway
474            client = self._loop_client()
475            retry_count = 0
476
477            while retry_count <= self.max_retries:
478                try:
479                    response = await client.get(
480                        url, headers=headers or None, timeout=60.0
481                    )
482                    response.raise_for_status()
483                    return response.content
484
485                except (httpx.TimeoutException, httpx.RequestError) as e:
486                    retry_count += 1
487                    if retry_count > self.max_retries:
488                        raise httpx.TimeoutException(
489                            f"Failed to load data after {self.max_retries} retries: {str(e)}",
490                            request=e.request
491                            if isinstance(e, httpx.RequestError)
492                            else None,
493                        )
494
495                    # Calculate backoff delay with jitter
496                    delay = self.initial_delay * (
497                        self.backoff_factor ** (retry_count - 1)
498                    )
499                    jitter = delay * 0.1 * (random.random() - 0.5)
500                    await asyncio.sleep(delay + jitter)
501
502                except httpx.HTTPStatusError:
503                    # Re-raise non-timeout HTTP errors immediately
504                    raise
505
506        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover
507
508    # --------------------------------------------------------------------- #
509    # pin_cid() – method to pin a CID                                       #
510    # --------------------------------------------------------------------- #
511    async def pin_cid(
512        self,
513        cid: CID,
514        target_rpc: str = "http://127.0.0.1:5001",
515    ) -> None:
516        """
517        Pins a CID to the local Kubo node via the RPC API.
518
519        This call is recursive by default, pinning all linked objects.
520
521        Args:
522            cid (CID): The Content ID to pin.
523            target_rpc (str): The RPC URL of the Kubo node.
524        """
525        params = {"arg": str(cid), "recursive": "true"}
526        pin_add_url_base: str = f"{target_rpc}/api/v0/pin/add"
527
528        async with self._sem:  # throttle RPC
529            client = self._loop_client()
530            response = await client.post(pin_add_url_base, params=params)
531            response.raise_for_status()
532
533    async def unpin_cid(
534        self, cid: CID, target_rpc: str = "http://127.0.0.1:5001"
535    ) -> None:
536        """
537        Unpins a CID from the local Kubo node via the RPC API.
538
539        Args:
540            cid (CID): The Content ID to unpin.
541        """
542        params = {"arg": str(cid), "recursive": "true"}
543        unpin_url_base: str = f"{target_rpc}/api/v0/pin/rm"
544        async with self._sem:  # throttle RPC
545            client = self._loop_client()
546            response = await client.post(unpin_url_base, params=params)
547            response.raise_for_status()
548
549    async def pin_update(
550        self,
551        old_id: IPLDKind,
552        new_id: IPLDKind,
553        target_rpc: str = "http://127.0.0.1:5001",
554    ) -> None:
555        """
556        Updates the pinned CID in the storage.
557
558        Args:
559            old_id (IPLDKind): The old Content ID to replace.
560            new_id (IPLDKind): The new Content ID to pin.
561        """
562        params = {"arg": [str(old_id), str(new_id)]}
563        pin_update_url_base: str = f"{target_rpc}/api/v0/pin/update"
564        async with self._sem:  # throttle RPC
565            client = self._loop_client()
566            response = await client.post(pin_update_url_base, params=params)
567            response.raise_for_status()
568
569    async def pin_ls(
570        self, target_rpc: str = "http://127.0.0.1:5001"
571    ) -> list[Dict[str, Any]]:
572        """
573        Lists all pinned CIDs on the local Kubo node via the RPC API.
574
575        Args:
576            target_rpc (str): The RPC URL of the Kubo node.
577
578        Returns:
579            List[CID]: A list of pinned CIDs.
580        """
581        pin_ls_url_base: str = f"{target_rpc}/api/v0/pin/ls"
582        async with self._sem:  # throttle RPC
583            client = self._loop_client()
584            response = await client.post(pin_ls_url_base)
585            response.raise_for_status()
586            pins = response.json().get("Keys", [])
587            return pins

Connects to an IPFS Kubo daemon.

The IDs in save and load are IPLD CIDs.

  • save() → RPC (/api/v0/add)
  • load() → HTTP gateway (/ipfs/{cid})

save uses the RPC API and load uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.

Authentication / custom headers

You have two options:

  1. Bring your own httpx.AsyncClient Pass it via client=... — any default headers or auth configured on that client are reused for every request.
  2. Let KuboCAS build the client but pass headers= and/or auth= kwargs; they are forwarded to the internally–created AsyncClient.
import httpx
from py_hamt import KuboCAS

# Option 1: user-supplied client
client = httpx.AsyncClient(
    headers={"Authorization": "Bearer <token>"},
    auth=("user", "pass"),
)
cas = KuboCAS(client=client)

# Option 2: let KuboCAS create the client
cas = KuboCAS(
    headers={"X-My-Header": "yes"},
    auth=("user", "pass"),
)

Parameters

  • hasher (str): multihash name (defaults to blake3).
  • client (httpx.AsyncClient | None): reuse an existing client; if None KuboCAS will create one lazily.
  • headers (dict[str, str] | None): default headers for the internally-created client.
  • auth (tuple[str, str] | None): authentication tuple (username, password) for the internally-created client.
  • rpc_base_url / gateway_base_url (str | None): override daemon endpoints (defaults match the local daemon ports).
  • chunker (str): chunking algorithm specification for Kubo's add RPC. Accepted formats are "size-<positive int>", "rabin", or "rabin-<min>-<avg>-<max>".

...

KuboCAS( hasher: str = 'blake3', client: httpx.AsyncClient | None = None, rpc_base_url: str | None = None, gateway_base_url: str | None = None, concurrency: int = 32, *, headers: dict[str, str] | None = None, auth: Optional[Tuple[str, str]] = None, pin_on_add: bool = False, chunker: str = 'size-1048576', max_retries: int = 3, initial_delay: float = 1.0, backoff_factor: float = 2.0)
182    def __init__(
183        self,
184        hasher: str = "blake3",
185        client: httpx.AsyncClient | None = None,
186        rpc_base_url: str | None = None,
187        gateway_base_url: str | None = None,
188        concurrency: int = 32,
189        *,
190        headers: dict[str, str] | None = None,
191        auth: Tuple[str, str] | None = None,
192        pin_on_add: bool = False,
193        chunker: str = "size-1048576",
194        max_retries: int = 3,
195        initial_delay: float = 1.0,
196        backoff_factor: float = 2.0,
197    ):
198        """
199        If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
200
201        ### `httpx.AsyncClient` Management
202        If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()`
203        as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
204
205        If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below:
206        ```python
207        async with httpx.AsyncClient() as client, KuboCAS(
208            rpc_base_url=rpc_base_url,
209            gateway_base_url=gateway_base_url,
210            client=client,
211        ) as kubo_cas:
212            hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
213            zhs = ZarrHAMTStore(hamt)
214            # Use the KuboCAS instance as needed
215            # ...
216        ```
217        As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up.
218        ``` python
219        cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
220        # Use the KuboCAS instance as needed
221        # ...
222        await cas.aclose()  # Ensure resources are cleaned up
223        ```
224
225        ### Authenticated RPC/Gateway Access
226        Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in.
227        Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided.
228        If you do not need authentication, you can leave these parameters as `None`.
229
230        ### RPC and HTTP Gateway Base URLs
231        These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
232        """
233
234        self._owns_client: bool = False
235        self._closed: bool = True
236        self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {}
237        self._default_headers = headers
238        self._default_auth = auth
239
240        # Now, perform validation that might raise an exception
241        chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)"
242        if re.fullmatch(chunker_pattern, chunker) is None:
243            raise ValueError("Invalid chunker specification")
244        self.chunker: str = chunker
245
246        self.hasher: str = hasher
247        """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT."""
248
249        if rpc_base_url is None:
250            rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL  # pragma
251        if gateway_base_url is None:
252            gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL
253
254        if "/ipfs/" in gateway_base_url:
255            gateway_base_url = gateway_base_url.split("/ipfs/")[0]
256
257        # Standard gateway URL construction with proper path handling
258        if gateway_base_url.endswith("/"):
259            gateway_base_url = f"{gateway_base_url}ipfs/"
260        else:
261            gateway_base_url = f"{gateway_base_url}/ipfs/"
262
263        pin_string: str = "true" if pin_on_add else "false"
264        self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin={pin_string}"
265        """@private"""
266        self.gateway_base_url: str = gateway_base_url
267        """@private"""
268
269        if client is not None:
270            # A client was supplied by the user. We don't own it.
271            self._owns_client = False
272            self._client_per_loop = {asyncio.get_running_loop(): client}
273        else:
274            # No client supplied. We will own any clients we create.
275            self._owns_client = True
276            self._client_per_loop = {}
277
278        # store for later use by _loop_client()
279        self._default_headers = headers
280        self._default_auth = auth
281
282        self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
283        self._closed = False
284
285        # Validate retry parameters
286        if max_retries < 0:
287            raise ValueError("max_retries must be non-negative")
288        if initial_delay <= 0:
289            raise ValueError("initial_delay must be positive")
290        if backoff_factor < 1.0:
291            raise ValueError("backoff_factor must be >= 1.0 for exponential backoff")
292
293        self.max_retries = max_retries
294        self.initial_delay = initial_delay
295        self.backoff_factor = backoff_factor

If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.

httpx.AsyncClient Management

If client is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using await cas.aclose() as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.

If you are using the KuboCAS instance in an async with block, it will automatically close the client when the block is exited which is what we suggest below:

async with httpx.AsyncClient() as client, KuboCAS(
    rpc_base_url=rpc_base_url,
    gateway_base_url=gateway_base_url,
    client=client,
) as kubo_cas:
    hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
    zhs = ZarrHAMTStore(hamt)
    # Use the KuboCAS instance as needed
    # ...

As mentioned, if you do not use the async with syntax, you should call await cas.aclose() when you are done using the instance to ensure that all resources are cleaned up.

cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
# Use the KuboCAS instance as needed
# ...
await cas.aclose()  # Ensure resources are cleaned up

Authenticated RPC/Gateway Access

Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own httpx.AsyncClient and then passing that in. Alternatively, they can pass in headers and auth parameters to the constructor, which will be used to create a new httpx.AsyncClient if one is not provided. If you do not need authentication, you can leave these parameters as None.

RPC and HTTP Gateway Base URLs

These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.

KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = 'http://127.0.0.1:8080'
KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = 'http://127.0.0.1:5001'
chunker: str
hasher: str

The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.

max_retries
initial_delay
backoff_factor
async def aclose(self) -> None:
335    async def aclose(self) -> None:
336        """
337        Closes all internally-created clients. Must be called from an async context.
338        """
339        if self._owns_client is False:  # external client → caller closes
340            return
341
342        # This method is async, so we can reliably await the async close method.
343        # The complex sync/async logic is handled by __del__.
344        for client in list(self._client_per_loop.values()):
345            if not client.is_closed:
346                try:
347                    await client.aclose()
348                except Exception:
349                    pass  # best-effort cleanup
350
351        self._client_per_loop.clear()
352        self._closed = True

Closes all internally-created clients. Must be called from an async context.

async def save( self, data: bytes, codec: Literal['raw', 'dag-cbor']) -> multiformats.cid.CID:
406    async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID:
407        async with self._sem:
408            files = {"file": data}
409            client = self._loop_client()
410            retry_count = 0
411
412            while retry_count <= self.max_retries:
413                try:
414                    response = await client.post(
415                        self.rpc_url, files=files, timeout=60.0
416                    )
417                    response.raise_for_status()
418                    cid_str: str = response.json()["Hash"]
419                    cid: CID = CID.decode(cid_str)
420                    if cid.codec.code != self.DAG_PB_MARKER:
421                        cid = cid.set(codec=codec)
422                    return cid
423
424                except (httpx.TimeoutException, httpx.RequestError) as e:
425                    retry_count += 1
426                    if retry_count > self.max_retries:
427                        raise httpx.TimeoutException(
428                            f"Failed to save data after {self.max_retries} retries: {str(e)}",
429                            request=e.request
430                            if isinstance(e, httpx.RequestError)
431                            else None,
432                        )
433
434                    # Calculate backoff delay
435                    delay = self.initial_delay * (
436                        self.backoff_factor ** (retry_count - 1)
437                    )
438                    # Add some jitter to prevent thundering herd
439                    jitter = delay * 0.1 * (random.random() - 0.5)
440                    await asyncio.sleep(delay + jitter)
441
442                except httpx.HTTPStatusError:
443                    # Re-raise non-timeout HTTP errors immediately
444                    raise
445        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover

Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.

codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.

async def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], offset: Optional[int] = None, length: Optional[int] = None, suffix: Optional[int] = None) -> bytes:
447    async def load(
448        self,
449        id: IPLDKind,
450        offset: Optional[int] = None,
451        length: Optional[int] = None,
452        suffix: Optional[int] = None,
453    ) -> bytes:
454        """Load data from a CID using the IPFS gateway with optional Range requests."""
455        cid = cast(CID, id)
456        url: str = f"{self.gateway_base_url + str(cid)}"
457        headers: Dict[str, str] = {}
458
459        # Construct the Range header if required
460        if offset is not None:
461            start = offset
462            if length is not None:
463                # Standard HTTP Range: bytes=start-end (inclusive)
464                end = start + length - 1
465                headers["Range"] = f"bytes={start}-{end}"
466            else:
467                # Standard HTTP Range: bytes=start- (from start to end)
468                headers["Range"] = f"bytes={start}-"
469        elif suffix is not None:
470            # Standard HTTP Range: bytes=-N (last N bytes)
471            headers["Range"] = f"bytes=-{suffix}"
472
473        async with self._sem:  # Throttle gateway
474            client = self._loop_client()
475            retry_count = 0
476
477            while retry_count <= self.max_retries:
478                try:
479                    response = await client.get(
480                        url, headers=headers or None, timeout=60.0
481                    )
482                    response.raise_for_status()
483                    return response.content
484
485                except (httpx.TimeoutException, httpx.RequestError) as e:
486                    retry_count += 1
487                    if retry_count > self.max_retries:
488                        raise httpx.TimeoutException(
489                            f"Failed to load data after {self.max_retries} retries: {str(e)}",
490                            request=e.request
491                            if isinstance(e, httpx.RequestError)
492                            else None,
493                        )
494
495                    # Calculate backoff delay with jitter
496                    delay = self.initial_delay * (
497                        self.backoff_factor ** (retry_count - 1)
498                    )
499                    jitter = delay * 0.1 * (random.random() - 0.5)
500                    await asyncio.sleep(delay + jitter)
501
502                except httpx.HTTPStatusError:
503                    # Re-raise non-timeout HTTP errors immediately
504                    raise
505
506        raise RuntimeError("Exited the retry loop unexpectedly.")  # pragma: no cover

Load data from a CID using the IPFS gateway with optional Range requests.

async def pin_cid( self, cid: multiformats.cid.CID, target_rpc: str = 'http://127.0.0.1:5001') -> None:
511    async def pin_cid(
512        self,
513        cid: CID,
514        target_rpc: str = "http://127.0.0.1:5001",
515    ) -> None:
516        """
517        Pins a CID to the local Kubo node via the RPC API.
518
519        This call is recursive by default, pinning all linked objects.
520
521        Args:
522            cid (CID): The Content ID to pin.
523            target_rpc (str): The RPC URL of the Kubo node.
524        """
525        params = {"arg": str(cid), "recursive": "true"}
526        pin_add_url_base: str = f"{target_rpc}/api/v0/pin/add"
527
528        async with self._sem:  # throttle RPC
529            client = self._loop_client()
530            response = await client.post(pin_add_url_base, params=params)
531            response.raise_for_status()

Pins a CID to the local Kubo node via the RPC API.

This call is recursive by default, pinning all linked objects.

Args: cid (CID): The Content ID to pin. target_rpc (str): The RPC URL of the Kubo node.

async def unpin_cid( self, cid: multiformats.cid.CID, target_rpc: str = 'http://127.0.0.1:5001') -> None:
533    async def unpin_cid(
534        self, cid: CID, target_rpc: str = "http://127.0.0.1:5001"
535    ) -> None:
536        """
537        Unpins a CID from the local Kubo node via the RPC API.
538
539        Args:
540            cid (CID): The Content ID to unpin.
541        """
542        params = {"arg": str(cid), "recursive": "true"}
543        unpin_url_base: str = f"{target_rpc}/api/v0/pin/rm"
544        async with self._sem:  # throttle RPC
545            client = self._loop_client()
546            response = await client.post(unpin_url_base, params=params)
547            response.raise_for_status()

Unpins a CID from the local Kubo node via the RPC API.

Args: cid (CID): The Content ID to unpin.

async def pin_update( self, old_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], new_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]], target_rpc: str = 'http://127.0.0.1:5001') -> None:
549    async def pin_update(
550        self,
551        old_id: IPLDKind,
552        new_id: IPLDKind,
553        target_rpc: str = "http://127.0.0.1:5001",
554    ) -> None:
555        """
556        Updates the pinned CID in the storage.
557
558        Args:
559            old_id (IPLDKind): The old Content ID to replace.
560            new_id (IPLDKind): The new Content ID to pin.
561        """
562        params = {"arg": [str(old_id), str(new_id)]}
563        pin_update_url_base: str = f"{target_rpc}/api/v0/pin/update"
564        async with self._sem:  # throttle RPC
565            client = self._loop_client()
566            response = await client.post(pin_update_url_base, params=params)
567            response.raise_for_status()

Updates the pinned CID in the storage.

Args: old_id (IPLDKind): The old Content ID to replace. new_id (IPLDKind): The new Content ID to pin.

async def pin_ls( self, target_rpc: str = 'http://127.0.0.1:5001') -> list[typing.Dict[str, typing.Any]]:
569    async def pin_ls(
570        self, target_rpc: str = "http://127.0.0.1:5001"
571    ) -> list[Dict[str, Any]]:
572        """
573        Lists all pinned CIDs on the local Kubo node via the RPC API.
574
575        Args:
576            target_rpc (str): The RPC URL of the Kubo node.
577
578        Returns:
579            List[CID]: A list of pinned CIDs.
580        """
581        pin_ls_url_base: str = f"{target_rpc}/api/v0/pin/ls"
582        async with self._sem:  # throttle RPC
583            client = self._loop_client()
584            response = await client.post(pin_ls_url_base)
585            response.raise_for_status()
586            pins = response.json().get("Keys", [])
587            return pins

Lists all pinned CIDs on the local Kubo node via the RPC API.

Args: target_rpc (str): The RPC URL of the Kubo node.

Returns: List[CID]: A list of pinned CIDs.

class ZarrHAMTStore(zarr.abc.store.Store):
 13class ZarrHAMTStore(zarr.abc.store.Store):
 14    """
 15    Write and read Zarr v3s with a HAMT.
 16
 17    Read **or** write a Zarr-v3 store whose key/value pairs live inside a
 18    py-hamt mapping.
 19
 20    Keys are stored verbatim (``"temp/c/0/0/0"`` → same string in HAMT) and
 21    the value is the raw byte payload produced by Zarr.  No additional
 22    framing, compression, or encryption is applied by this class. For a zarr encryption example
 23    see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
 24    For a fully encrypted zarr store, where metadata is not available, please see
 25    :class:`SimpleEncryptedZarrHAMTStore` but we do not recommend using it.
 26
 27    #### A note about using the same `ZarrHAMTStore` for writing and then reading again
 28    If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.
 29
 30    #### Sample Code
 31    ```python
 32    # --- Write ---
 33    ds: xarray.Dataset = # ...
 34    cas: ContentAddressedStore = # ...
 35    hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
 36    hamt = await HAMT.build(cas, values_are_bytes=True)     # write-enabled
 37    zhs  = ZarrHAMTStore(hamt, read_only=False)
 38    ds.to_zarr(store=zhs, mode="w", zarr_format=3)
 39    await hamt.make_read_only() # flush + freeze
 40    root_node_id = hamt.root_node_id
 41    print(root_node_id)
 42
 43     # --- read ---
 44    hamt_ro = await HAMT.build(
 45        cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
 46    )
 47    zhs_ro  = ZarrHAMTStore(hamt_ro, read_only=True)
 48    ds_ro = xarray.open_zarr(store=zhs_ro)
 49
 50
 51    print(ds_ro)
 52    xarray.testing.assert_identical(ds, ds_ro)
 53    ```
 54    """
 55
 56    _forced_read_only: bool | None = None  # sentinel for wrapper clones
 57
 58    def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
 59        """
 60        ### `hamt` and `read_only`
 61        You need to make sure the following two things are true:
 62
 63        1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async.
 64        2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations.
 65
 66        ##### A note about the zarr chunk separator, "/" vs "."
 67        While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
 68
 69        #### Metadata Read Cache
 70        `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
 71        """
 72        super().__init__(read_only=read_only)
 73
 74        assert hamt.read_only == read_only
 75        assert hamt.values_are_bytes
 76        self.hamt: HAMT = hamt
 77        """
 78        The internal HAMT.
 79        Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
 80        """
 81
 82        self.metadata_read_cache: dict[str, bytes] = {}
 83        """@private"""
 84
 85    def _map_byte_request(
 86        self, byte_range: Optional[zarr.abc.store.ByteRequest]
 87    ) -> tuple[Optional[int], Optional[int], Optional[int]]:
 88        """Helper to map Zarr ByteRequest to offset, length, suffix."""
 89        offset: Optional[int] = None
 90        length: Optional[int] = None
 91        suffix: Optional[int] = None
 92
 93        if byte_range:
 94            if isinstance(byte_range, zarr.abc.store.RangeByteRequest):
 95                offset = byte_range.start
 96                length = byte_range.end - byte_range.start
 97                if length is not None and length < 0:
 98                    raise ValueError("End must be >= start for RangeByteRequest")
 99            elif isinstance(byte_range, zarr.abc.store.OffsetByteRequest):
100                offset = byte_range.offset
101            elif isinstance(byte_range, zarr.abc.store.SuffixByteRequest):
102                suffix = byte_range.suffix
103            else:
104                raise TypeError(f"Unsupported ByteRequest type: {type(byte_range)}")
105
106        return offset, length, suffix
107
108    @property
109    def read_only(self) -> bool:  # type: ignore[override]
110        if self._forced_read_only is not None:  # instance attr overrides
111            return self._forced_read_only
112        return self.hamt.read_only
113
114    def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore":
115        """
116        Return this store (if the flag already matches) or a *shallow*
117        clone that presents the requested read‑only status.
118
119        The clone **shares** the same :class:`~py_hamt.hamt.HAMT`
120        instance; no flushing, network traffic or async work is done.
121        """
122        # Fast path
123        if read_only == self.read_only:
124            return self  # Same mode, return same instance
125
126        # Create new instance with different read_only flag
127        # Creates a *bare* instance without running its __init__
128        clone = type(self).__new__(type(self))
129
130        # Copy attributes that matter
131        clone.hamt = self.hamt  # Share the HAMT
132        clone._forced_read_only = read_only
133        clone.metadata_read_cache = self.metadata_read_cache.copy()
134
135        # Re‑initialise the zarr base class so that Zarr sees the flag
136        zarr.abc.store.Store.__init__(clone, read_only=read_only)
137        return clone
138
139    def __eq__(self, other: object) -> bool:
140        """@private"""
141        if not isinstance(other, ZarrHAMTStore):
142            return False
143        return self.hamt.root_node_id == other.hamt.root_node_id
144
145    async def get(
146        self,
147        key: str,
148        prototype: zarr.core.buffer.BufferPrototype,
149        byte_range: zarr.abc.store.ByteRequest | None = None,
150    ) -> zarr.core.buffer.Buffer | None:
151        """@private"""
152        try:
153            val: bytes
154            # do len check to avoid indexing into overly short strings, 3.12 does not throw errors but we dont know if other versions will
155            is_metadata: bool = (
156                len(key) >= 9 and key[-9:] == "zarr.json"
157            )  # if path ends with zarr.json
158
159            if is_metadata and byte_range is None and key in self.metadata_read_cache:
160                val = self.metadata_read_cache[key]
161            else:
162                offset, length, suffix = self._map_byte_request(byte_range)
163                val = cast(
164                    bytes,
165                    await self.hamt.get(
166                        key, offset=offset, length=length, suffix=suffix
167                    ),
168                )  # We know values received will always be bytes since we only store bytes in the HAMT
169                if is_metadata and byte_range is None:
170                    self.metadata_read_cache[key] = val
171
172            return prototype.buffer.from_bytes(val)
173        except KeyError:
174            # Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases
175            return None
176        except Exception as e:
177            print(f"Error getting key '{key}' with range {byte_range}: {e}")
178            raise
179
180    async def get_partial_values(
181        self,
182        prototype: zarr.core.buffer.BufferPrototype,
183        key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]],
184    ) -> list[zarr.core.buffer.Buffer | None]:
185        """
186        Retrieves multiple keys or byte ranges concurrently using asyncio.gather.
187        """
188        tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges]
189        results = await asyncio.gather(
190            *tasks, return_exceptions=False
191        )  # Set return_exceptions=True for debugging
192        return results
193
194    async def exists(self, key: str) -> bool:
195        """@private"""
196        try:
197            await self.hamt.get(key)
198            return True
199        except KeyError:
200            return False
201
202    @property
203    def supports_writes(self) -> bool:
204        """@private"""
205        return not self.hamt.read_only
206
207    @property
208    def supports_partial_writes(self) -> bool:
209        """@private"""
210        return False
211
212    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
213        """@private"""
214        if self.read_only:
215            raise Exception("Cannot write to a read-only store.")
216
217        if key in self.metadata_read_cache:
218            self.metadata_read_cache[key] = value.to_bytes()
219        await self.hamt.set(key, value.to_bytes())
220
221    async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None:
222        """@private"""
223        if not (await self.exists(key)):
224            await self.set(key, value)
225
226    async def set_partial_values(
227        self, key_start_values: Iterable[tuple[str, int, BytesLike]]
228    ) -> None:
229        """@private"""
230        raise NotImplementedError
231
232    @property
233    def supports_deletes(self) -> bool:
234        """@private"""
235        return not self.hamt.read_only
236
237    async def delete(self, key: str) -> None:
238        """@private"""
239        if self.read_only:
240            raise Exception("Cannot write to a read-only store.")
241        try:
242            await self.hamt.delete(key)
243            # In practice these lines never seem to be needed, creating and appending data are the only operations most zarrs actually undergo
244            # if key in self.metadata_read_cache:
245            #     del self.metadata_read_cache[key]
246        # It's fine if the key was not in the HAMT
247        # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason, probably concurrency issues
248        except KeyError:
249            return
250
251    @property
252    def supports_listing(self) -> bool:
253        """@private"""
254        return True
255
256    async def list(self) -> AsyncIterator[str]:
257        """@private"""
258        async for key in self.hamt.keys():
259            yield key
260
261    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
262        """@private"""
263        async for key in self.hamt.keys():
264            if key.startswith(prefix):
265                yield key
266
267    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
268        """
269        @private
270        List *immediate* children that live directly under **prefix**.
271
272        This is similar to :py:meth:`list_prefix` but collapses everything
273        below the first ``"/"`` after *prefix*.  Each child name is yielded
274        **exactly once** in the order of first appearance while scanning the
275        HAMT keys.
276
277        Parameters
278        ----------
279        prefix : str
280            Logical directory path.  *Must* end with ``"/"`` for the result to
281            make sense (e.g. ``"a/b/"``).
282
283        Yields
284        ------
285        str
286            The name of each direct child (file or sub-directory) of *prefix*.
287
288        Examples
289        --------
290        With keys ::
291
292            a/b/c/d
293            a/b/c/e
294            a/b/f
295            a/b/g/h/i
296
297        ``await list_dir("a/b/")`` produces ::
298
299            c
300            f
301            g
302
303        Notes
304        -----
305        • Internally uses a :class:`set` to deduplicate names; memory grows
306            with the number of *unique* children, not the total number of keys.
307        • Order is **not** sorted; it reflects the first encounter while
308            iterating over :py:meth:`HAMT.keys`.
309        """
310        seen_names: set[str] = set()
311        async for key in self.hamt.keys():
312            if key.startswith(prefix):
313                suffix: str = key[len(prefix) :]
314                first_slash: int = suffix.find("/")
315                if first_slash == -1:
316                    if suffix not in seen_names:
317                        seen_names.add(suffix)
318                        yield suffix
319                else:
320                    name: str = suffix[0:first_slash]
321                    if name not in seen_names:
322                        seen_names.add(name)
323                        yield name

Write and read Zarr v3s with a HAMT.

Read or write a Zarr-v3 store whose key/value pairs live inside a py-hamt mapping.

Keys are stored verbatim ("temp/c/0/0/0" → same string in HAMT) and the value is the raw byte payload produced by Zarr. No additional framing, compression, or encryption is applied by this class. For a zarr encryption example see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb For a fully encrypted zarr store, where metadata is not available, please see SimpleEncryptedZarrHAMTStore but we do not recommend using it.

A note about using the same ZarrHAMTStore for writing and then reading again

If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.

Sample Code

# --- Write ---
ds: xarray.Dataset = # ...
cas: ContentAddressedStore = # ...
hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
hamt = await HAMT.build(cas, values_are_bytes=True)     # write-enabled
zhs  = ZarrHAMTStore(hamt, read_only=False)
ds.to_zarr(store=zhs, mode="w", zarr_format=3)
await hamt.make_read_only() # flush + freeze
root_node_id = hamt.root_node_id
print(root_node_id)

 # --- read ---
hamt_ro = await HAMT.build(
    cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
)
zhs_ro  = ZarrHAMTStore(hamt_ro, read_only=True)
ds_ro = xarray.open_zarr(store=zhs_ro)


print(ds_ro)
xarray.testing.assert_identical(ds, ds_ro)
ZarrHAMTStore(hamt: HAMT, read_only: bool = False)
58    def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
59        """
60        ### `hamt` and `read_only`
61        You need to make sure the following two things are true:
62
63        1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async.
64        2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations.
65
66        ##### A note about the zarr chunk separator, "/" vs "."
67        While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
68
69        #### Metadata Read Cache
70        `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
71        """
72        super().__init__(read_only=read_only)
73
74        assert hamt.read_only == read_only
75        assert hamt.values_are_bytes
76        self.hamt: HAMT = hamt
77        """
78        The internal HAMT.
79        Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
80        """
81
82        self.metadata_read_cache: dict[str, bytes] = {}
83        """@private"""

hamt and read_only

You need to make sure the following two things are true:

  1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that hamt.read_only == read_only. This is because making a HAMT read only automatically requires async operations, but __init__ cannot be async.
  2. The HAMT has hamt.values_are_bytes == True. This improves efficiency with Zarr v3 operations.
A note about the zarr chunk separator, "/" vs "."

While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.

Metadata Read Cache

ZarrHAMTStore has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.

hamt: HAMT

The internal HAMT. Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.

read_only: bool
108    @property
109    def read_only(self) -> bool:  # type: ignore[override]
110        if self._forced_read_only is not None:  # instance attr overrides
111            return self._forced_read_only
112        return self.hamt.read_only

Is the store read-only?

def with_read_only(self, read_only: bool = False) -> ZarrHAMTStore:
114    def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore":
115        """
116        Return this store (if the flag already matches) or a *shallow*
117        clone that presents the requested read‑only status.
118
119        The clone **shares** the same :class:`~py_hamt.hamt.HAMT`
120        instance; no flushing, network traffic or async work is done.
121        """
122        # Fast path
123        if read_only == self.read_only:
124            return self  # Same mode, return same instance
125
126        # Create new instance with different read_only flag
127        # Creates a *bare* instance without running its __init__
128        clone = type(self).__new__(type(self))
129
130        # Copy attributes that matter
131        clone.hamt = self.hamt  # Share the HAMT
132        clone._forced_read_only = read_only
133        clone.metadata_read_cache = self.metadata_read_cache.copy()
134
135        # Re‑initialise the zarr base class so that Zarr sees the flag
136        zarr.abc.store.Store.__init__(clone, read_only=read_only)
137        return clone

Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.

The clone shares the same ~py_hamt.hamt.HAMT instance; no flushing, network traffic or async work is done.

async def get_partial_values( self, prototype: zarr.core.buffer.core.BufferPrototype, key_ranges: Iterable[tuple[str, zarr.abc.store.RangeByteRequest | zarr.abc.store.OffsetByteRequest | zarr.abc.store.SuffixByteRequest | None]]) -> list[zarr.core.buffer.core.Buffer | None]:
180    async def get_partial_values(
181        self,
182        prototype: zarr.core.buffer.BufferPrototype,
183        key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]],
184    ) -> list[zarr.core.buffer.Buffer | None]:
185        """
186        Retrieves multiple keys or byte ranges concurrently using asyncio.gather.
187        """
188        tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges]
189        results = await asyncio.gather(
190            *tasks, return_exceptions=False
191        )  # Set return_exceptions=True for debugging
192        return results

Retrieves multiple keys or byte ranges concurrently using asyncio.gather.

class SimpleEncryptedZarrHAMTStore(py_hamt.ZarrHAMTStore):
 13class SimpleEncryptedZarrHAMTStore(ZarrHAMTStore):
 14    """
 15    Write and read Zarr v3s with a HAMT, encrypting *everything* for maximum privacy.
 16
 17    This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
 18    stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
 19    and data chunks. This provides strong privacy but means the Zarr store is
 20    completely opaque and unusable without the correct encryption key and header.
 21
 22    Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
 23
 24    #### Encryption Details
 25    - Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
 26    - Requires a 32-byte encryption key and a header.
 27    - Each encrypted value includes a 24-byte nonce and a 16-byte tag.
 28
 29    #### Important Considerations
 30    - Since metadata is encrypted, standard Zarr tools cannot inspect the
 31      dataset without prior decryption using this store class.
 32    - There is no support for partial encryption or excluding variables.
 33    - There is no metadata caching.
 34
 35    #### Sample Code
 36    ```python
 37    import xarray
 38    from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
 39    from Crypto.Random import get_random_bytes
 40    import numpy as np
 41
 42    # Setup
 43    ds = xarray.Dataset(
 44        {"data": (("y", "x"), np.arange(12).reshape(3, 4))},
 45        coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
 46    )
 47    cas = KuboCAS() # Example ContentAddressedStore
 48    encryption_key = get_random_bytes(32)
 49    header = b"fully-encrypted-zarr"
 50
 51    # --- Write ---
 52    hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
 53    ezhs_write = SimpleEncryptedZarrHAMTStore(
 54        hamt_write, False, encryption_key, header
 55    )
 56    print("Writing fully encrypted Zarr...")
 57    ds.to_zarr(store=ezhs_write, mode="w")
 58    await hamt_write.make_read_only()
 59    root_node_id = hamt_write.root_node_id
 60    print(f"Wrote Zarr with root: {root_node_id}")
 61
 62    # --- Read ---
 63    hamt_read = await HAMT.build(
 64            cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
 65        )
 66    ezhs_read = SimpleEncryptedZarrHAMTStore(
 67        hamt_read, True, encryption_key, header
 68    )
 69    print("\nReading fully encrypted Zarr...")
 70    ds_read = xarray.open_zarr(store=ezhs_read)
 71    print("Read back dataset:")
 72    print(ds_read)
 73    xarray.testing.assert_identical(ds, ds_read)
 74    print("Read successful and data verified.")
 75
 76    # --- Read with wrong key (demonstrates failure) ---
 77    wrong_key = get_random_bytes(32)
 78    hamt_bad = await HAMT.build(
 79        cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
 80    )
 81    ezhs_bad = SimpleEncryptedZarrHAMTStore(
 82        hamt_bad, True, wrong_key, header
 83    )
 84    print("\nAttempting to read with wrong key...")
 85    try:
 86        ds_bad = xarray.open_zarr(store=ezhs_bad)
 87        print(ds_bad)
 88    except Exception as e:
 89        print(f"Failed to read as expected: {type(e).__name__} - {e}")
 90    ```
 91    """
 92
 93    def __init__(
 94        self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes
 95    ) -> None:
 96        """
 97        Initializes the SimpleEncryptedZarrHAMTStore.
 98
 99        Args:
100            hamt: The HAMT instance for storage. Must have `values_are_bytes=True`.
101                  Its `read_only` status must match the `read_only` argument.
102            read_only: If True, the store is in read-only mode.
103            encryption_key: A 32-byte key for ChaCha20-Poly1305.
104            header: A header (bytes) used as associated data in encryption.
105        """
106        super().__init__(hamt, read_only=read_only)
107
108        if len(encryption_key) != 32:
109            raise ValueError("Encryption key must be exactly 32 bytes long.")
110        self.encryption_key = encryption_key
111        self.header = header
112        self.metadata_read_cache: dict[str, bytes] = {}
113
114    def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore":
115        if read_only == self.read_only:
116            return self
117
118        clone = type(self).__new__(type(self))
119        clone.hamt = self.hamt
120        clone.encryption_key = self.encryption_key
121        clone.header = self.header
122        clone.metadata_read_cache = self.metadata_read_cache
123        clone._forced_read_only = read_only  # safe; attribute is declared
124        zarr.abc.store.Store.__init__(clone, read_only=read_only)
125        return clone
126
127    def _encrypt(self, val: bytes) -> bytes:
128        """Encrypts data using ChaCha20-Poly1305."""
129        nonce = get_random_bytes(24)  # XChaCha20 uses a 24-byte nonce
130        cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce)
131        cipher.update(self.header)
132        ciphertext, tag = cipher.encrypt_and_digest(val)
133        return nonce + tag + ciphertext
134
135    def _decrypt(self, val: bytes) -> bytes:
136        """Decrypts data using ChaCha20-Poly1305."""
137        try:
138            # Extract nonce (24), tag (16), and ciphertext
139            nonce, tag, ciphertext = val[:24], val[24:40], val[40:]
140            cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce)
141            cipher.update(self.header)
142            plaintext = cipher.decrypt_and_verify(ciphertext, tag)
143            return plaintext
144        except Exception as e:
145            # Catching a broad exception as various issues (key, tag, length) can occur.
146            raise ValueError(
147                "Decryption failed. Check key, header, or data integrity."
148            ) from e
149
150    def __eq__(self, other: object) -> bool:
151        """@private"""
152        if not isinstance(other, SimpleEncryptedZarrHAMTStore):
153            return False
154        return (
155            self.hamt.root_node_id == other.hamt.root_node_id
156            and self.encryption_key == other.encryption_key
157            and self.header == other.header
158        )
159
160    async def get(
161        self,
162        key: str,
163        prototype: zarr.core.buffer.BufferPrototype,
164        byte_range: zarr.abc.store.ByteRequest | None = None,
165    ) -> zarr.core.buffer.Buffer | None:
166        """@private"""
167        try:
168            decrypted_val: bytes
169            is_metadata: bool = (
170                len(key) >= 9 and key[-9:] == "zarr.json"
171            )  # if path ends with zarr.json
172
173            if is_metadata and key in self.metadata_read_cache:
174                decrypted_val = self.metadata_read_cache[key]
175            else:
176                raw_val = cast(
177                    bytes, await self.hamt.get(key)
178                )  # We know values received will always be bytes since we only store bytes in the HAMT
179                decrypted_val = self._decrypt(raw_val)
180                if is_metadata:
181                    self.metadata_read_cache[key] = decrypted_val
182            return prototype.buffer.from_bytes(decrypted_val)
183        except KeyError:
184            return None
185
186    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
187        """@private"""
188        if self.read_only:
189            raise Exception("Cannot write to a read-only store.")
190
191        raw_bytes = value.to_bytes()
192        if key in self.metadata_read_cache:
193            self.metadata_read_cache[key] = raw_bytes
194        # Encrypt it
195        encrypted_bytes = self._encrypt(raw_bytes)
196        await self.hamt.set(key, encrypted_bytes)

Write and read Zarr v3s with a HAMT, encrypting everything for maximum privacy.

This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
and data chunks. This provides strong privacy but means the Zarr store is
completely opaque and unusable without the correct encryption key and header.

Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb

#### Encryption Details
- Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
- Requires a 32-byte encryption key and a header.
- Each encrypted value includes a 24-byte nonce and a 16-byte tag.

#### Important Considerations
- Since metadata is encrypted, standard Zarr tools cannot inspect the
  dataset without prior decryption using this store class.
- There is no support for partial encryption or excluding variables.
- There is no metadata caching.

#### Sample Code


    import xarray
    from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
    from Crypto.Random import get_random_bytes
    import numpy as np

    # Setup
    ds = xarray.Dataset(
        {"data": (("y", "x"), np.arange(12).reshape(3, 4))},
        coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
    )
    cas = KuboCAS() # Example ContentAddressedStore
    encryption_key = get_random_bytes(32)
    header = b"fully-encrypted-zarr"

    # --- Write ---
    hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
    ezhs_write = SimpleEncryptedZarrHAMTStore(
        hamt_write, False, encryption_key, header
    )
    print("Writing fully encrypted Zarr...")
    ds.to_zarr(store=ezhs_write, mode="w")
    await hamt_write.make_read_only()
    root_node_id = hamt_write.root_node_id
    print(f"Wrote Zarr with root: {root_node_id}")

    # --- Read ---
    hamt_read = await HAMT.build(
            cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
        )
    ezhs_read = SimpleEncryptedZarrHAMTStore(
        hamt_read, True, encryption_key, header
    )
    print("
Reading fully encrypted Zarr...")
    ds_read = xarray.open_zarr(store=ezhs_read)
    print("Read back dataset:")
    print(ds_read)
    xarray.testing.assert_identical(ds, ds_read)
    print("Read successful and data verified.")

    # --- Read with wrong key (demonstrates failure) ---
    wrong_key = get_random_bytes(32)
    hamt_bad = await HAMT.build(
        cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
    )
    ezhs_bad = SimpleEncryptedZarrHAMTStore(
        hamt_bad, True, wrong_key, header
    )
    print("
Attempting to read with wrong key...")
    try:
        ds_bad = xarray.open_zarr(store=ezhs_bad)
        print(ds_bad)
    except Exception as e:
        print(f"Failed to read as expected: {type(e).__name__} - {e}")
SimpleEncryptedZarrHAMTStore( hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes)
 93    def __init__(
 94        self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes
 95    ) -> None:
 96        """
 97        Initializes the SimpleEncryptedZarrHAMTStore.
 98
 99        Args:
100            hamt: The HAMT instance for storage. Must have `values_are_bytes=True`.
101                  Its `read_only` status must match the `read_only` argument.
102            read_only: If True, the store is in read-only mode.
103            encryption_key: A 32-byte key for ChaCha20-Poly1305.
104            header: A header (bytes) used as associated data in encryption.
105        """
106        super().__init__(hamt, read_only=read_only)
107
108        if len(encryption_key) != 32:
109            raise ValueError("Encryption key must be exactly 32 bytes long.")
110        self.encryption_key = encryption_key
111        self.header = header
112        self.metadata_read_cache: dict[str, bytes] = {}

Initializes the SimpleEncryptedZarrHAMTStore.

Args: hamt: The HAMT instance for storage. Must have values_are_bytes=True. Its read_only status must match the read_only argument. read_only: If True, the store is in read-only mode. encryption_key: A 32-byte key for ChaCha20-Poly1305. header: A header (bytes) used as associated data in encryption.

encryption_key
header
def with_read_only( self, read_only: bool = False) -> SimpleEncryptedZarrHAMTStore:
114    def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore":
115        if read_only == self.read_only:
116            return self
117
118        clone = type(self).__new__(type(self))
119        clone.hamt = self.hamt
120        clone.encryption_key = self.encryption_key
121        clone.header = self.header
122        clone.metadata_read_cache = self.metadata_read_cache
123        clone._forced_read_only = read_only  # safe; attribute is declared
124        zarr.abc.store.Store.__init__(clone, read_only=read_only)
125        return clone

Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.

The clone shares the same ~py_hamt.hamt.HAMT instance; no flushing, network traffic or async work is done.

class ShardedZarrStore(zarr.abc.store.Store):
143class ShardedZarrStore(zarr.abc.store.Store):
144    """
145    Implements the Zarr Store API using a sharded layout for chunk CIDs.
146
147    This store divides the flat index of chunk CIDs into multiple "shards".
148    Each shard is a DAG-CBOR array where each element is either a CID link
149    to a chunk or a null value if the chunk is empty. This structure allows
150    for efficient traversal by IPLD-aware systems.
151
152    The store's root object contains:
153    1.  A dictionary mapping metadata keys (like 'zarr.json') to their CIDs.
154    2.  A list of CIDs, where each CID points to a shard object.
155    3.  Sharding configuration details (e.g., chunks_per_shard).
156    """
157
158    def __init__(
159        self,
160        cas: ContentAddressedStore,
161        read_only: bool,
162        root_cid: Optional[str] = None,
163        *,
164        max_cache_memory_bytes: int = 100 * 1024 * 1024,  # 100MB default
165    ):
166        """Use the async `open()` classmethod to instantiate this class."""
167        super().__init__(read_only=read_only)
168        self.cas = cas
169        self._root_cid = root_cid
170        self._root_obj: dict
171
172        self._resize_lock = asyncio.Lock()
173        # An event to signal when a resize is in-progress.
174        # It starts in the "set" state, allowing all operations to proceed.
175        self._resize_complete = asyncio.Event()
176        self._resize_complete.set()
177        self._shard_locks: DefaultDict[int, asyncio.Lock] = defaultdict(asyncio.Lock)
178
179        self._shard_data_cache = MemoryBoundedLRUCache(max_cache_memory_bytes)
180        self._pending_shard_loads: Dict[int, asyncio.Event] = {}
181
182        self._array_shape: Tuple[int, ...]
183        self._chunk_shape: Tuple[int, ...]
184        self._chunks_per_dim: Tuple[int, ...]
185        self._chunks_per_shard: int
186        self._num_shards: int = 0
187        self._total_chunks: int = 0
188
189        self._dirty_root = False
190
191    def __update_geometry(self):
192        """Calculates derived geometric properties from the base shapes."""
193
194        if not all(cs > 0 for cs in self._chunk_shape):
195            raise ValueError("All chunk_shape dimensions must be positive.")
196        if not all(s >= 0 for s in self._array_shape):
197            raise ValueError("All array_shape dimensions must be non-negative.")
198
199        self._chunks_per_dim = tuple(
200            math.ceil(a / c) if c > 0 else 0
201            for a, c in zip(self._array_shape, self._chunk_shape)
202        )
203        self._total_chunks = math.prod(self._chunks_per_dim)
204
205        if not self._total_chunks == 0:
206            self._num_shards = (
207                self._total_chunks + self._chunks_per_shard - 1
208            ) // self._chunks_per_shard
209
210    @classmethod
211    async def open(
212        cls,
213        cas: ContentAddressedStore,
214        read_only: bool,
215        root_cid: Optional[str] = None,
216        *,
217        array_shape: Optional[Tuple[int, ...]] = None,
218        chunk_shape: Optional[Tuple[int, ...]] = None,
219        chunks_per_shard: Optional[int] = None,
220        max_cache_memory_bytes: int = 100 * 1024 * 1024,  # 100MB default
221    ) -> "ShardedZarrStore":
222        """
223        Asynchronously opens an existing ShardedZarrStore or initializes a new one.
224        """
225        store = cls(
226            cas, read_only, root_cid, max_cache_memory_bytes=max_cache_memory_bytes
227        )
228        if root_cid:
229            await store._load_root_from_cid()
230        elif not read_only:
231            if array_shape is None or chunk_shape is None:
232                raise ValueError(
233                    "array_shape and chunk_shape must be provided for a new store."
234                )
235
236            if not isinstance(chunks_per_shard, int) or chunks_per_shard <= 0:
237                raise ValueError("chunks_per_shard must be a positive integer.")
238
239            store._initialize_new_root(array_shape, chunk_shape, chunks_per_shard)
240        else:
241            raise ValueError("root_cid must be provided for a read-only store.")
242        return store
243
244    def _initialize_new_root(
245        self,
246        array_shape: Tuple[int, ...],
247        chunk_shape: Tuple[int, ...],
248        chunks_per_shard: int,
249    ):
250        self._array_shape = array_shape
251        self._chunk_shape = chunk_shape
252        self._chunks_per_shard = chunks_per_shard
253
254        self.__update_geometry()
255
256        self._root_obj = {
257            "manifest_version": "sharded_zarr_v1",
258            "metadata": {},
259            "chunks": {
260                "array_shape": list(self._array_shape),
261                "chunk_shape": list(self._chunk_shape),
262                "sharding_config": {
263                    "chunks_per_shard": self._chunks_per_shard,
264                },
265                "shard_cids": [None] * self._num_shards,
266            },
267        }
268        self._dirty_root = True
269
270    async def _load_root_from_cid(self):
271        root_bytes = await self.cas.load(self._root_cid)
272        try:
273            self._root_obj = dag_cbor.decode(root_bytes)
274            if not isinstance(self._root_obj, dict) or "chunks" not in self._root_obj:
275                raise ValueError(
276                    "Root object is not a valid dictionary with 'chunks' key."
277                )
278            if not isinstance(self._root_obj["chunks"]["shard_cids"], list):
279                raise ValueError("shard_cids is not a list.")
280        except Exception as e:
281            raise ValueError(f"Failed to decode root object: {e}")
282
283        if self._root_obj.get("manifest_version") != "sharded_zarr_v1":
284            raise ValueError(
285                f"Incompatible manifest version: {self._root_obj.get('manifest_version')}. Expected 'sharded_zarr_v1'."
286            )
287
288        chunk_info = self._root_obj["chunks"]
289        self._array_shape = tuple(chunk_info["array_shape"])
290        self._chunk_shape = tuple(chunk_info["chunk_shape"])
291        self._chunks_per_shard = chunk_info["sharding_config"]["chunks_per_shard"]
292
293        self.__update_geometry()
294
295        if len(chunk_info["shard_cids"]) != self._num_shards:
296            raise ValueError(
297                f"Inconsistent number of shards. Expected {self._num_shards}, found {len(chunk_info['shard_cids'])}."
298            )
299
300    async def _fetch_and_cache_full_shard(
301        self,
302        shard_idx: int,
303        shard_cid: str,
304        max_retries: int = 3,
305        retry_delay: float = 1.0,
306    ) -> None:
307        """
308        Fetch a shard from CAS and cache it, with retry logic for transient errors.
309
310        Args:
311            shard_idx: The index of the shard to fetch.
312            shard_cid: The CID of the shard.
313            max_retries: Maximum number of retry attempts for transient errors.
314            retry_delay: Delay between retry attempts in seconds.
315        """
316        for attempt in range(max_retries):
317            try:
318                shard_data_bytes = await self.cas.load(shard_cid)
319                decoded_shard = dag_cbor.decode(shard_data_bytes)
320                if not isinstance(decoded_shard, list):
321                    raise TypeError(f"Shard {shard_idx} did not decode to a list.")
322                await self._shard_data_cache.put(shard_idx, decoded_shard)
323                # Always set the Event to unblock waiting coroutines
324                if shard_idx in self._pending_shard_loads:
325                    self._pending_shard_loads[shard_idx].set()
326                    del self._pending_shard_loads[shard_idx]
327                return  # Success
328            except (ConnectionError, TimeoutError) as e:
329                # Handle transient errors (e.g., network issues)
330                if attempt < max_retries - 1:
331                    await asyncio.sleep(
332                        retry_delay * (2**attempt)
333                    )  # Exponential backoff
334                    continue
335                else:
336                    raise RuntimeError(
337                        f"Failed to fetch shard {shard_idx} after {max_retries} attempts: {e}"
338                    )
339
340    def _parse_chunk_key(self, key: str) -> Optional[Tuple[int, ...]]:
341        # 1. Exclude .json files immediately (metadata)
342        if key.endswith(".json"):
343            return None
344        excluded_array_prefixes = {"time", "lat", "lon", "latitude", "longitude"}
345
346        chunk_marker = "/c/"
347        marker_idx = key.rfind(chunk_marker)  # Use rfind for robustness
348        if marker_idx == -1:
349            # Key does not contain "/c/", so it's not a chunk data key
350            # in the expected format (e.g., could be .zattrs, .zgroup at various levels).
351            return None
352
353        # Extract the part of the key before "/c/", which might represent the array/group path
354        # e.g., "temp" from "temp/c/0/0/0"
355        # e.g., "group1/lat" from "group1/lat/c/0"
356        # e.g., "" if key is "c/0/0/0" (root array)
357        path_before_c = key[:marker_idx]
358
359        # Determine the actual array name (the last component of the path before "/c/")
360        actual_array_name = ""
361        if path_before_c:
362            actual_array_name = path_before_c.split("/")[-1]
363
364        # 2. If the determined array name is in our exclusion list, return None.
365        if actual_array_name in excluded_array_prefixes:
366            return None
367
368        # The part after "/c/" contains the chunk coordinates
369        coord_part = key[marker_idx + len(chunk_marker) :]
370        parts = coord_part.split("/")
371
372        coords = tuple(map(int, parts))
373        # Validate coordinates against the chunk grid of the store's configured array
374        for i, c_coord in enumerate(coords):
375            if not (0 <= c_coord < self._chunks_per_dim[i]):
376                raise IndexError(
377                    f"Chunk coordinate {c_coord} at dimension {i} is out of bounds for dimension size {self._chunks_per_dim[i]}."
378                )
379        return coords
380
381    def _get_linear_chunk_index(self, chunk_coords: Tuple[int, ...]) -> int:
382        linear_index = 0
383        multiplier = 1
384        # Convert N-D chunk coordinates to a flat 1-D index (row-major order)
385        for i in reversed(range(len(self._chunks_per_dim))):
386            linear_index += chunk_coords[i] * multiplier
387            multiplier *= self._chunks_per_dim[i]
388        return linear_index
389
390    def _get_shard_info(self, linear_chunk_index: int) -> Tuple[int, int]:
391        shard_idx = linear_chunk_index // self._chunks_per_shard
392        index_in_shard = linear_chunk_index % self._chunks_per_shard
393        return shard_idx, index_in_shard
394
395    async def _load_or_initialize_shard_cache(
396        self, shard_idx: int
397    ) -> List[Optional[CID]]:
398        """
399        Load a shard into the cache or initialize an empty shard if it doesn't exist.
400
401        Args:
402            shard_idx: The index of the shard to load or initialize.
403
404        Returns:
405            List[Optional[CID]]: The shard data (list of CIDs or None).
406
407        Raises:
408            ValueError: If the shard index is out of bounds.
409            RuntimeError: If the shard cannot be loaded or initialized.
410        """
411        cached_shard = await self._shard_data_cache.get(shard_idx)
412        if cached_shard is not None:
413            return cached_shard
414
415        if shard_idx in self._pending_shard_loads:
416            try:
417                # Wait for the pending load with a timeout (e.g., 60 seconds)
418                await asyncio.wait_for(
419                    self._pending_shard_loads[shard_idx].wait(), timeout=60.0
420                )
421                cached_shard = await self._shard_data_cache.get(shard_idx)
422                if cached_shard is not None:
423                    return cached_shard
424                else:
425                    raise RuntimeError(
426                        f"Shard {shard_idx} not found in cache after pending load completed."
427                    )
428            except asyncio.TimeoutError:
429                # Clean up the pending load to allow retry
430                if shard_idx in self._pending_shard_loads:
431                    self._pending_shard_loads[shard_idx].set()
432                    del self._pending_shard_loads[shard_idx]
433                raise RuntimeError(f"Timeout waiting for shard {shard_idx} to load.")
434
435        if not (0 <= shard_idx < self._num_shards):
436            raise ValueError(f"Shard index {shard_idx} out of bounds.")
437
438        shard_cid_obj = self._root_obj["chunks"]["shard_cids"][shard_idx]
439        if shard_cid_obj:
440            self._pending_shard_loads[shard_idx] = asyncio.Event()
441            shard_cid_str = str(shard_cid_obj)
442            await self._fetch_and_cache_full_shard(shard_idx, shard_cid_str)
443        else:
444            empty_shard = [None] * self._chunks_per_shard
445            await self._shard_data_cache.put(shard_idx, empty_shard)
446
447        result = await self._shard_data_cache.get(shard_idx)
448        if result is None:
449            raise RuntimeError(f"Failed to load or initialize shard {shard_idx}")
450        return result  # type: ignore[return-value]
451
452    async def set_partial_values(
453        self, key_start_values: Iterable[Tuple[str, int, BytesLike]]
454    ) -> None:
455        raise NotImplementedError(
456            "Partial writes are not supported by ShardedZarrStore."
457        )
458
459    async def get_partial_values(
460        self,
461        prototype: zarr.core.buffer.BufferPrototype,
462        key_ranges: Iterable[Tuple[str, zarr.abc.store.ByteRequest | None]],
463    ) -> List[Optional[zarr.core.buffer.Buffer]]:
464        tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges]
465        results = await asyncio.gather(*tasks)
466        return results
467
468    def with_read_only(self, read_only: bool = False) -> "ShardedZarrStore":
469        """
470        Return this store (if the flag already matches) or a *shallow*
471        clone that presents the requested read‑only status.
472
473        The clone **shares** the same CAS instance and internal state;
474        no flushing, network traffic or async work is done.
475        """
476        # Fast path
477        if read_only == self.read_only:
478            return self  # Same mode, return same instance
479
480        # Create new instance with different read_only flag
481        # Creates a *bare* instance without running its __init__
482        clone = type(self).__new__(type(self))
483
484        # Copy all attributes from the current instance
485        clone.cas = self.cas
486        clone._root_cid = self._root_cid
487        clone._root_obj = self._root_obj
488
489        clone._resize_lock = self._resize_lock
490        clone._resize_complete = self._resize_complete
491        clone._shard_locks = self._shard_locks
492
493        clone._shard_data_cache = self._shard_data_cache
494        clone._pending_shard_loads = self._pending_shard_loads
495
496        clone._array_shape = self._array_shape
497        clone._chunk_shape = self._chunk_shape
498        clone._chunks_per_dim = self._chunks_per_dim
499        clone._chunks_per_shard = self._chunks_per_shard
500        clone._num_shards = self._num_shards
501        clone._total_chunks = self._total_chunks
502
503        clone._dirty_root = self._dirty_root
504
505        # Re‑initialise the zarr base class so that Zarr sees the flag
506        zarr.abc.store.Store.__init__(clone, read_only=read_only)
507        return clone
508
509    def __eq__(self, other: object) -> bool:
510        if not isinstance(other, ShardedZarrStore):
511            return False
512        # For equality, root CID is primary. Config like chunks_per_shard is part of that root's identity.
513        return self._root_cid == other._root_cid
514
515    # If nothing to flush, return the root CID.
516    async def flush(self) -> str:
517        async with self._shard_data_cache._cache_lock:
518            dirty_shards = list(self._shard_data_cache._dirty_shards)
519        if dirty_shards:
520            for shard_idx in sorted(dirty_shards):
521                # Get the list of CIDs/Nones from the cache
522                shard_data_list = await self._shard_data_cache.get(shard_idx)
523                if shard_data_list is None:
524                    raise RuntimeError(f"Dirty shard {shard_idx} not found in cache")
525
526                # Encode this list into a DAG-CBOR byte representation
527                shard_data_bytes = dag_cbor.encode(shard_data_list)
528
529                # Save the DAG-CBOR block and get its CID
530                new_shard_cid_obj = await self.cas.save(
531                    shard_data_bytes,
532                    codec="dag-cbor",  # Use 'dag-cbor' codec
533                )
534
535                if (
536                    self._root_obj["chunks"]["shard_cids"][shard_idx]
537                    != new_shard_cid_obj
538                ):
539                    # Store the CID object directly
540                    self._root_obj["chunks"]["shard_cids"][shard_idx] = (
541                        new_shard_cid_obj
542                    )
543                    self._dirty_root = True
544                    # Mark shard as clean after flushing
545                    await self._shard_data_cache.mark_clean(shard_idx)
546
547        if self._dirty_root:
548            # Ensure all metadata CIDs are CID objects for correct encoding
549            self._root_obj["metadata"] = {
550                k: (CID.decode(v) if isinstance(v, str) else v)
551                for k, v in self._root_obj["metadata"].items()
552            }
553            root_obj_bytes = dag_cbor.encode(self._root_obj)
554            new_root_cid = await self.cas.save(root_obj_bytes, codec="dag-cbor")
555            self._root_cid = str(new_root_cid)
556            self._dirty_root = False
557
558        # Ignore because root_cid will always exist after initialization or flush.
559        return self._root_cid  # type: ignore[return-value]
560
561    async def get(
562        self,
563        key: str,
564        prototype: zarr.core.buffer.BufferPrototype,
565        byte_range: Optional[zarr.abc.store.ByteRequest] = None,
566    ) -> Optional[zarr.core.buffer.Buffer]:
567        chunk_coords = self._parse_chunk_key(key)
568        # Metadata request
569        if chunk_coords is None:
570            metadata_cid_obj = self._root_obj["metadata"].get(key)
571            if metadata_cid_obj is None:
572                return None
573            if byte_range is not None:
574                raise ValueError(
575                    "Byte range requests are not supported for metadata keys."
576                )
577            data = await self.cas.load(str(metadata_cid_obj))
578            return prototype.buffer.from_bytes(data)
579        # Chunk data request
580        linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
581        shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
582
583        # This will load the full shard into cache if it's not already there.
584        shard_lock = self._shard_locks[shard_idx]
585        async with shard_lock:
586            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
587
588        # Get the CID object (or None) from the cached list.
589        chunk_cid_obj = target_shard_list[index_in_shard]
590
591        if chunk_cid_obj is None:
592            return None  # Chunk is empty/doesn't exist.
593
594        chunk_cid_str = str(chunk_cid_obj)
595
596        req_offset = None
597        req_length = None
598        req_suffix = None
599
600        if byte_range:
601            if isinstance(byte_range, RangeByteRequest):
602                req_offset = byte_range.start
603                if byte_range.end is not None:
604                    if byte_range.start > byte_range.end:
605                        raise ValueError(
606                            f"Byte range start ({byte_range.start}) cannot be greater than end ({byte_range.end})"
607                        )
608                    req_length = byte_range.end - byte_range.start
609            elif isinstance(byte_range, OffsetByteRequest):
610                req_offset = byte_range.offset
611            elif isinstance(byte_range, SuffixByteRequest):
612                req_suffix = byte_range.suffix
613        data = await self.cas.load(
614            chunk_cid_str, offset=req_offset, length=req_length, suffix=req_suffix
615        )
616        return prototype.buffer.from_bytes(data)
617
618    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
619        if self.read_only:
620            raise PermissionError("Cannot write to a read-only store.")
621        await self._resize_complete.wait()
622
623        if (
624            key.endswith("zarr.json")
625            and not key.startswith("time/")
626            and not key.startswith(("lat/", "latitude/"))
627            and not key.startswith(("lon/", "longitude/"))
628            and not key == "zarr.json"
629        ):
630            metadata_json = json.loads(value.to_bytes().decode("utf-8"))
631            new_array_shape = metadata_json.get("shape")
632            if not new_array_shape:
633                raise ValueError("Shape not found in metadata.")
634            if tuple(new_array_shape) != self._array_shape:
635                async with self._resize_lock:
636                    # Double-check after acquiring the lock, in case another task
637                    # just finished this exact resize while we were waiting.
638                    if tuple(new_array_shape) != self._array_shape:
639                        # Block all other tasks until resize is complete.
640                        self._resize_complete.clear()
641                        try:
642                            await self.resize_store(new_shape=tuple(new_array_shape))
643                        finally:
644                            # All waiting tasks will now un-pause and proceed safely.
645                            self._resize_complete.set()
646
647        raw_data_bytes = value.to_bytes()
648        # Save the data to CAS first to get its CID.
649        # Metadata is often saved as 'raw', chunks as well unless compressed.
650        try:
651            data_cid_obj = await self.cas.save(raw_data_bytes, codec="raw")
652            await self.set_pointer(key, str(data_cid_obj))
653        except Exception as e:
654            raise RuntimeError(f"Failed to save data for key {key}: {e}")
655        return None  # type: ignore[return-value]
656
657    async def set_pointer(self, key: str, pointer: str) -> None:
658        chunk_coords = self._parse_chunk_key(key)
659
660        pointer_cid_obj = CID.decode(pointer)  # Convert string to CID object
661
662        if chunk_coords is None:  # Metadata key
663            self._root_obj["metadata"][key] = pointer_cid_obj
664            self._dirty_root = True
665            return None
666
667        linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
668        shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
669
670        shard_lock = self._shard_locks[shard_idx]
671        async with shard_lock:
672            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
673
674            if target_shard_list[index_in_shard] != pointer_cid_obj:
675                target_shard_list[index_in_shard] = pointer_cid_obj
676                await self._shard_data_cache.mark_dirty(shard_idx)
677        return None
678
679    async def exists(self, key: str) -> bool:
680        try:
681            chunk_coords = self._parse_chunk_key(key)
682            if chunk_coords is None:  # Metadata
683                return key in self._root_obj.get("metadata", {})
684            linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
685            shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
686            # Load shard if not cached and check the index
687            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
688            return target_shard_list[index_in_shard] is not None
689        except (ValueError, IndexError, KeyError):
690            return False
691
692    @property
693    def supports_writes(self) -> bool:
694        return not self.read_only
695
696    @property
697    def supports_partial_writes(self) -> bool:
698        return False  # Each chunk CID is written atomically into a shard slot
699
700    @property
701    def supports_deletes(self) -> bool:
702        return not self.read_only
703
704    async def delete(self, key: str) -> None:
705        if self.read_only:
706            raise PermissionError("Cannot delete from a read-only store.")
707
708        chunk_coords = self._parse_chunk_key(key)
709        if chunk_coords is None:  # Metadata
710            if self._root_obj["metadata"].pop(key, None):
711                self._dirty_root = True
712            else:
713                raise KeyError(f"Metadata key '{key}' not found.")
714            return None
715
716        linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
717        shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
718
719        shard_lock = self._shard_locks[shard_idx]
720        async with shard_lock:
721            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
722            if target_shard_list[index_in_shard] is not None:
723                target_shard_list[index_in_shard] = None
724                await self._shard_data_cache.mark_dirty(shard_idx)
725
726    @property
727    def supports_listing(self) -> bool:
728        return True
729
730    async def list(self) -> AsyncIterator[str]:
731        for key in list(self._root_obj.get("metadata", {})):
732            yield key
733
734    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
735        async for key in self.list():
736            if key.startswith(prefix):
737                yield key
738
739    async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]):
740        if self.read_only:
741            raise PermissionError("Cannot graft onto a read-only store.")
742
743        store_to_graft = await ShardedZarrStore.open(
744            cas=self.cas, read_only=True, root_cid=store_to_graft_cid
745        )
746        source_chunk_grid = store_to_graft._chunks_per_dim
747        for local_coords in itertools.product(*[range(s) for s in source_chunk_grid]):
748            linear_local_index = store_to_graft._get_linear_chunk_index(local_coords)
749            local_shard_idx, index_in_local_shard = store_to_graft._get_shard_info(
750                linear_local_index
751            )
752            # Load the source shard into its cache
753            source_shard_list = await store_to_graft._load_or_initialize_shard_cache(
754                local_shard_idx
755            )
756
757            pointer_cid_obj = source_shard_list[index_in_local_shard]
758            if pointer_cid_obj is None:
759                continue
760
761            # Calculate global coordinates and write to the main store's index
762            global_coords = tuple(
763                c_local + c_offset
764                for c_local, c_offset in zip(local_coords, chunk_offset)
765            )
766            linear_global_index = self._get_linear_chunk_index(global_coords)
767            global_shard_idx, index_in_global_shard = self._get_shard_info(
768                linear_global_index
769            )
770
771            shard_lock = self._shard_locks[global_shard_idx]
772            async with shard_lock:
773                target_shard_list = await self._load_or_initialize_shard_cache(
774                    global_shard_idx
775                )
776                if target_shard_list[index_in_global_shard] != pointer_cid_obj:
777                    target_shard_list[index_in_global_shard] = pointer_cid_obj
778                    await self._shard_data_cache.mark_dirty(global_shard_idx)
779
780    async def resize_store(self, new_shape: Tuple[int, ...]):
781        """
782        Resizes the store's main shard index to accommodate a new overall array shape.
783        This is a metadata-only operation on the store's root object.
784        Used when doing skeleton writes or appends via xarray where the array shape changes.
785        """
786        if self.read_only:
787            raise PermissionError("Cannot resize a read-only store.")
788        if (
789            # self._root_obj is None
790            self._chunk_shape is None
791            or self._chunks_per_shard is None
792            or self._array_shape is None
793        ):
794            raise RuntimeError("Store is not properly initialized for resizing.")
795        if len(new_shape) != len(self._array_shape):
796            raise ValueError(
797                "New shape must have the same number of dimensions as the old shape."
798            )
799
800        self._array_shape = tuple(new_shape)
801        self._chunks_per_dim = tuple(
802            math.ceil(a / c) if c > 0 else 0
803            for a, c in zip(self._array_shape, self._chunk_shape)
804        )
805        self._total_chunks = math.prod(self._chunks_per_dim)
806        old_num_shards = self._num_shards if self._num_shards is not None else 0
807        self._num_shards = (
808            (self._total_chunks + self._chunks_per_shard - 1) // self._chunks_per_shard
809            if self._total_chunks > 0
810            else 0
811        )
812        self._root_obj["chunks"]["array_shape"] = list(self._array_shape)
813        if self._num_shards > old_num_shards:
814            self._root_obj["chunks"]["shard_cids"].extend(
815                [None] * (self._num_shards - old_num_shards)
816            )
817        elif self._num_shards < old_num_shards:
818            self._root_obj["chunks"]["shard_cids"] = self._root_obj["chunks"][
819                "shard_cids"
820            ][: self._num_shards]
821
822        self._dirty_root = True
823
824    async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]):
825        """
826        Resizes the Zarr metadata for a specific variable (e.g., '.json' file).
827        This does NOT change the store's main shard index.
828        """
829        if self.read_only:
830            raise PermissionError("Cannot resize a read-only store.")
831
832        zarr_metadata_key = f"{variable_name}/zarr.json"
833
834        old_zarr_metadata_cid = self._root_obj["metadata"].get(zarr_metadata_key)
835        if not old_zarr_metadata_cid:
836            raise KeyError(
837                f"Cannot find metadata for key '{zarr_metadata_key}' to resize."
838            )
839
840        old_zarr_metadata_bytes = await self.cas.load(old_zarr_metadata_cid)
841        zarr_metadata_json = json.loads(old_zarr_metadata_bytes)
842
843        zarr_metadata_json["shape"] = list(new_shape)
844
845        new_zarr_metadata_bytes = json.dumps(zarr_metadata_json, indent=2).encode(
846            "utf-8"
847        )
848        # Metadata is a raw blob of bytes
849        new_zarr_metadata_cid = await self.cas.save(
850            new_zarr_metadata_bytes, codec="raw"
851        )
852
853        self._root_obj["metadata"][zarr_metadata_key] = new_zarr_metadata_cid
854        self._dirty_root = True
855
856    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
857        seen: Set[str] = set()
858        if prefix == "":
859            async for key in self.list():  # Iterates metadata keys
860                # e.g., if key is "group1/.zgroup" or "array1/.json", first_component is "group1" or "array1"
861                # if key is ".zgroup", first_component is ".zgroup"
862                first_component = key.split("/", 1)[0]
863                if first_component not in seen:
864                    seen.add(first_component)
865                    yield first_component
866        else:
867            raise NotImplementedError("Listing with a prefix is not implemented yet.")

Implements the Zarr Store API using a sharded layout for chunk CIDs.

This store divides the flat index of chunk CIDs into multiple "shards". Each shard is a DAG-CBOR array where each element is either a CID link to a chunk or a null value if the chunk is empty. This structure allows for efficient traversal by IPLD-aware systems.

The store's root object contains:

  1. A dictionary mapping metadata keys (like 'zarr.json') to their CIDs.
  2. A list of CIDs, where each CID points to a shard object.
  3. Sharding configuration details (e.g., chunks_per_shard).
ShardedZarrStore( cas: ContentAddressedStore, read_only: bool, root_cid: Optional[str] = None, *, max_cache_memory_bytes: int = 104857600)
158    def __init__(
159        self,
160        cas: ContentAddressedStore,
161        read_only: bool,
162        root_cid: Optional[str] = None,
163        *,
164        max_cache_memory_bytes: int = 100 * 1024 * 1024,  # 100MB default
165    ):
166        """Use the async `open()` classmethod to instantiate this class."""
167        super().__init__(read_only=read_only)
168        self.cas = cas
169        self._root_cid = root_cid
170        self._root_obj: dict
171
172        self._resize_lock = asyncio.Lock()
173        # An event to signal when a resize is in-progress.
174        # It starts in the "set" state, allowing all operations to proceed.
175        self._resize_complete = asyncio.Event()
176        self._resize_complete.set()
177        self._shard_locks: DefaultDict[int, asyncio.Lock] = defaultdict(asyncio.Lock)
178
179        self._shard_data_cache = MemoryBoundedLRUCache(max_cache_memory_bytes)
180        self._pending_shard_loads: Dict[int, asyncio.Event] = {}
181
182        self._array_shape: Tuple[int, ...]
183        self._chunk_shape: Tuple[int, ...]
184        self._chunks_per_dim: Tuple[int, ...]
185        self._chunks_per_shard: int
186        self._num_shards: int = 0
187        self._total_chunks: int = 0
188
189        self._dirty_root = False

Use the async open() classmethod to instantiate this class.

cas
@classmethod
async def open( cls, cas: ContentAddressedStore, read_only: bool, root_cid: Optional[str] = None, *, array_shape: Optional[Tuple[int, ...]] = None, chunk_shape: Optional[Tuple[int, ...]] = None, chunks_per_shard: Optional[int] = None, max_cache_memory_bytes: int = 104857600) -> ShardedZarrStore:
210    @classmethod
211    async def open(
212        cls,
213        cas: ContentAddressedStore,
214        read_only: bool,
215        root_cid: Optional[str] = None,
216        *,
217        array_shape: Optional[Tuple[int, ...]] = None,
218        chunk_shape: Optional[Tuple[int, ...]] = None,
219        chunks_per_shard: Optional[int] = None,
220        max_cache_memory_bytes: int = 100 * 1024 * 1024,  # 100MB default
221    ) -> "ShardedZarrStore":
222        """
223        Asynchronously opens an existing ShardedZarrStore or initializes a new one.
224        """
225        store = cls(
226            cas, read_only, root_cid, max_cache_memory_bytes=max_cache_memory_bytes
227        )
228        if root_cid:
229            await store._load_root_from_cid()
230        elif not read_only:
231            if array_shape is None or chunk_shape is None:
232                raise ValueError(
233                    "array_shape and chunk_shape must be provided for a new store."
234                )
235
236            if not isinstance(chunks_per_shard, int) or chunks_per_shard <= 0:
237                raise ValueError("chunks_per_shard must be a positive integer.")
238
239            store._initialize_new_root(array_shape, chunk_shape, chunks_per_shard)
240        else:
241            raise ValueError("root_cid must be provided for a read-only store.")
242        return store

Asynchronously opens an existing ShardedZarrStore or initializes a new one.

async def set_partial_values( self, key_start_values: Iterable[typing.Tuple[str, int, bytes | bytearray | memoryview]]) -> None:
452    async def set_partial_values(
453        self, key_start_values: Iterable[Tuple[str, int, BytesLike]]
454    ) -> None:
455        raise NotImplementedError(
456            "Partial writes are not supported by ShardedZarrStore."
457        )

Store values at a given key, starting at byte range_start.

Parameters

key_start_values : list[tuple[str, int, BytesLike]] set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key

async def get_partial_values( self, prototype: zarr.core.buffer.core.BufferPrototype, key_ranges: Iterable[typing.Tuple[str, zarr.abc.store.RangeByteRequest | zarr.abc.store.OffsetByteRequest | zarr.abc.store.SuffixByteRequest | None]]) -> List[Optional[zarr.core.buffer.core.Buffer]]:
459    async def get_partial_values(
460        self,
461        prototype: zarr.core.buffer.BufferPrototype,
462        key_ranges: Iterable[Tuple[str, zarr.abc.store.ByteRequest | None]],
463    ) -> List[Optional[zarr.core.buffer.Buffer]]:
464        tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges]
465        results = await asyncio.gather(*tasks)
466        return results

Retrieve possibly partial values from given key_ranges.

Parameters

prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]] Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns

list of values, in the order of the key_ranges, may contain null/none for missing keys

def with_read_only( self, read_only: bool = False) -> ShardedZarrStore:
468    def with_read_only(self, read_only: bool = False) -> "ShardedZarrStore":
469        """
470        Return this store (if the flag already matches) or a *shallow*
471        clone that presents the requested read‑only status.
472
473        The clone **shares** the same CAS instance and internal state;
474        no flushing, network traffic or async work is done.
475        """
476        # Fast path
477        if read_only == self.read_only:
478            return self  # Same mode, return same instance
479
480        # Create new instance with different read_only flag
481        # Creates a *bare* instance without running its __init__
482        clone = type(self).__new__(type(self))
483
484        # Copy all attributes from the current instance
485        clone.cas = self.cas
486        clone._root_cid = self._root_cid
487        clone._root_obj = self._root_obj
488
489        clone._resize_lock = self._resize_lock
490        clone._resize_complete = self._resize_complete
491        clone._shard_locks = self._shard_locks
492
493        clone._shard_data_cache = self._shard_data_cache
494        clone._pending_shard_loads = self._pending_shard_loads
495
496        clone._array_shape = self._array_shape
497        clone._chunk_shape = self._chunk_shape
498        clone._chunks_per_dim = self._chunks_per_dim
499        clone._chunks_per_shard = self._chunks_per_shard
500        clone._num_shards = self._num_shards
501        clone._total_chunks = self._total_chunks
502
503        clone._dirty_root = self._dirty_root
504
505        # Re‑initialise the zarr base class so that Zarr sees the flag
506        zarr.abc.store.Store.__init__(clone, read_only=read_only)
507        return clone

Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.

The clone shares the same CAS instance and internal state; no flushing, network traffic or async work is done.

async def flush(self) -> str:
516    async def flush(self) -> str:
517        async with self._shard_data_cache._cache_lock:
518            dirty_shards = list(self._shard_data_cache._dirty_shards)
519        if dirty_shards:
520            for shard_idx in sorted(dirty_shards):
521                # Get the list of CIDs/Nones from the cache
522                shard_data_list = await self._shard_data_cache.get(shard_idx)
523                if shard_data_list is None:
524                    raise RuntimeError(f"Dirty shard {shard_idx} not found in cache")
525
526                # Encode this list into a DAG-CBOR byte representation
527                shard_data_bytes = dag_cbor.encode(shard_data_list)
528
529                # Save the DAG-CBOR block and get its CID
530                new_shard_cid_obj = await self.cas.save(
531                    shard_data_bytes,
532                    codec="dag-cbor",  # Use 'dag-cbor' codec
533                )
534
535                if (
536                    self._root_obj["chunks"]["shard_cids"][shard_idx]
537                    != new_shard_cid_obj
538                ):
539                    # Store the CID object directly
540                    self._root_obj["chunks"]["shard_cids"][shard_idx] = (
541                        new_shard_cid_obj
542                    )
543                    self._dirty_root = True
544                    # Mark shard as clean after flushing
545                    await self._shard_data_cache.mark_clean(shard_idx)
546
547        if self._dirty_root:
548            # Ensure all metadata CIDs are CID objects for correct encoding
549            self._root_obj["metadata"] = {
550                k: (CID.decode(v) if isinstance(v, str) else v)
551                for k, v in self._root_obj["metadata"].items()
552            }
553            root_obj_bytes = dag_cbor.encode(self._root_obj)
554            new_root_cid = await self.cas.save(root_obj_bytes, codec="dag-cbor")
555            self._root_cid = str(new_root_cid)
556            self._dirty_root = False
557
558        # Ignore because root_cid will always exist after initialization or flush.
559        return self._root_cid  # type: ignore[return-value]
async def get( self, key: str, prototype: zarr.core.buffer.core.BufferPrototype, byte_range: Union[zarr.abc.store.RangeByteRequest, zarr.abc.store.OffsetByteRequest, zarr.abc.store.SuffixByteRequest, NoneType] = None) -> Optional[zarr.core.buffer.core.Buffer]:
561    async def get(
562        self,
563        key: str,
564        prototype: zarr.core.buffer.BufferPrototype,
565        byte_range: Optional[zarr.abc.store.ByteRequest] = None,
566    ) -> Optional[zarr.core.buffer.Buffer]:
567        chunk_coords = self._parse_chunk_key(key)
568        # Metadata request
569        if chunk_coords is None:
570            metadata_cid_obj = self._root_obj["metadata"].get(key)
571            if metadata_cid_obj is None:
572                return None
573            if byte_range is not None:
574                raise ValueError(
575                    "Byte range requests are not supported for metadata keys."
576                )
577            data = await self.cas.load(str(metadata_cid_obj))
578            return prototype.buffer.from_bytes(data)
579        # Chunk data request
580        linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
581        shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
582
583        # This will load the full shard into cache if it's not already there.
584        shard_lock = self._shard_locks[shard_idx]
585        async with shard_lock:
586            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
587
588        # Get the CID object (or None) from the cached list.
589        chunk_cid_obj = target_shard_list[index_in_shard]
590
591        if chunk_cid_obj is None:
592            return None  # Chunk is empty/doesn't exist.
593
594        chunk_cid_str = str(chunk_cid_obj)
595
596        req_offset = None
597        req_length = None
598        req_suffix = None
599
600        if byte_range:
601            if isinstance(byte_range, RangeByteRequest):
602                req_offset = byte_range.start
603                if byte_range.end is not None:
604                    if byte_range.start > byte_range.end:
605                        raise ValueError(
606                            f"Byte range start ({byte_range.start}) cannot be greater than end ({byte_range.end})"
607                        )
608                    req_length = byte_range.end - byte_range.start
609            elif isinstance(byte_range, OffsetByteRequest):
610                req_offset = byte_range.offset
611            elif isinstance(byte_range, SuffixByteRequest):
612                req_suffix = byte_range.suffix
613        data = await self.cas.load(
614            chunk_cid_str, offset=req_offset, length=req_length, suffix=req_suffix
615        )
616        return prototype.buffer.from_bytes(data)

Retrieve the value associated with a given key.

Parameters

key : str prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. byte_range : ByteRequest, optional ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns

Buffer

async def set(self, key: str, value: zarr.core.buffer.core.Buffer) -> None:
618    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
619        if self.read_only:
620            raise PermissionError("Cannot write to a read-only store.")
621        await self._resize_complete.wait()
622
623        if (
624            key.endswith("zarr.json")
625            and not key.startswith("time/")
626            and not key.startswith(("lat/", "latitude/"))
627            and not key.startswith(("lon/", "longitude/"))
628            and not key == "zarr.json"
629        ):
630            metadata_json = json.loads(value.to_bytes().decode("utf-8"))
631            new_array_shape = metadata_json.get("shape")
632            if not new_array_shape:
633                raise ValueError("Shape not found in metadata.")
634            if tuple(new_array_shape) != self._array_shape:
635                async with self._resize_lock:
636                    # Double-check after acquiring the lock, in case another task
637                    # just finished this exact resize while we were waiting.
638                    if tuple(new_array_shape) != self._array_shape:
639                        # Block all other tasks until resize is complete.
640                        self._resize_complete.clear()
641                        try:
642                            await self.resize_store(new_shape=tuple(new_array_shape))
643                        finally:
644                            # All waiting tasks will now un-pause and proceed safely.
645                            self._resize_complete.set()
646
647        raw_data_bytes = value.to_bytes()
648        # Save the data to CAS first to get its CID.
649        # Metadata is often saved as 'raw', chunks as well unless compressed.
650        try:
651            data_cid_obj = await self.cas.save(raw_data_bytes, codec="raw")
652            await self.set_pointer(key, str(data_cid_obj))
653        except Exception as e:
654            raise RuntimeError(f"Failed to save data for key {key}: {e}")
655        return None  # type: ignore[return-value]

Store a (key, value) pair.

Parameters

key : str value : Buffer

async def set_pointer(self, key: str, pointer: str) -> None:
657    async def set_pointer(self, key: str, pointer: str) -> None:
658        chunk_coords = self._parse_chunk_key(key)
659
660        pointer_cid_obj = CID.decode(pointer)  # Convert string to CID object
661
662        if chunk_coords is None:  # Metadata key
663            self._root_obj["metadata"][key] = pointer_cid_obj
664            self._dirty_root = True
665            return None
666
667        linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
668        shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
669
670        shard_lock = self._shard_locks[shard_idx]
671        async with shard_lock:
672            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
673
674            if target_shard_list[index_in_shard] != pointer_cid_obj:
675                target_shard_list[index_in_shard] = pointer_cid_obj
676                await self._shard_data_cache.mark_dirty(shard_idx)
677        return None
async def exists(self, key: str) -> bool:
679    async def exists(self, key: str) -> bool:
680        try:
681            chunk_coords = self._parse_chunk_key(key)
682            if chunk_coords is None:  # Metadata
683                return key in self._root_obj.get("metadata", {})
684            linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
685            shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
686            # Load shard if not cached and check the index
687            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
688            return target_shard_list[index_in_shard] is not None
689        except (ValueError, IndexError, KeyError):
690            return False

Check if a key exists in the store.

Parameters

key : str

Returns

bool

supports_writes: bool
692    @property
693    def supports_writes(self) -> bool:
694        return not self.read_only

Does the store support writes?

supports_partial_writes: bool
696    @property
697    def supports_partial_writes(self) -> bool:
698        return False  # Each chunk CID is written atomically into a shard slot

Does the store support partial writes?

supports_deletes: bool
700    @property
701    def supports_deletes(self) -> bool:
702        return not self.read_only

Does the store support deletes?

async def delete(self, key: str) -> None:
704    async def delete(self, key: str) -> None:
705        if self.read_only:
706            raise PermissionError("Cannot delete from a read-only store.")
707
708        chunk_coords = self._parse_chunk_key(key)
709        if chunk_coords is None:  # Metadata
710            if self._root_obj["metadata"].pop(key, None):
711                self._dirty_root = True
712            else:
713                raise KeyError(f"Metadata key '{key}' not found.")
714            return None
715
716        linear_chunk_index = self._get_linear_chunk_index(chunk_coords)
717        shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index)
718
719        shard_lock = self._shard_locks[shard_idx]
720        async with shard_lock:
721            target_shard_list = await self._load_or_initialize_shard_cache(shard_idx)
722            if target_shard_list[index_in_shard] is not None:
723                target_shard_list[index_in_shard] = None
724                await self._shard_data_cache.mark_dirty(shard_idx)

Remove a key from the store

Parameters

key : str

supports_listing: bool
726    @property
727    def supports_listing(self) -> bool:
728        return True

Does the store support listing?

async def list(self) -> AsyncIterator[str]:
730    async def list(self) -> AsyncIterator[str]:
731        for key in list(self._root_obj.get("metadata", {})):
732            yield key

Retrieve all keys in the store.

Returns

AsyncIterator[str]

async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
734    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
735        async for key in self.list():
736            if key.startswith(prefix):
737                yield key

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters

prefix : str

Returns

AsyncIterator[str]

async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]):
739    async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]):
740        if self.read_only:
741            raise PermissionError("Cannot graft onto a read-only store.")
742
743        store_to_graft = await ShardedZarrStore.open(
744            cas=self.cas, read_only=True, root_cid=store_to_graft_cid
745        )
746        source_chunk_grid = store_to_graft._chunks_per_dim
747        for local_coords in itertools.product(*[range(s) for s in source_chunk_grid]):
748            linear_local_index = store_to_graft._get_linear_chunk_index(local_coords)
749            local_shard_idx, index_in_local_shard = store_to_graft._get_shard_info(
750                linear_local_index
751            )
752            # Load the source shard into its cache
753            source_shard_list = await store_to_graft._load_or_initialize_shard_cache(
754                local_shard_idx
755            )
756
757            pointer_cid_obj = source_shard_list[index_in_local_shard]
758            if pointer_cid_obj is None:
759                continue
760
761            # Calculate global coordinates and write to the main store's index
762            global_coords = tuple(
763                c_local + c_offset
764                for c_local, c_offset in zip(local_coords, chunk_offset)
765            )
766            linear_global_index = self._get_linear_chunk_index(global_coords)
767            global_shard_idx, index_in_global_shard = self._get_shard_info(
768                linear_global_index
769            )
770
771            shard_lock = self._shard_locks[global_shard_idx]
772            async with shard_lock:
773                target_shard_list = await self._load_or_initialize_shard_cache(
774                    global_shard_idx
775                )
776                if target_shard_list[index_in_global_shard] != pointer_cid_obj:
777                    target_shard_list[index_in_global_shard] = pointer_cid_obj
778                    await self._shard_data_cache.mark_dirty(global_shard_idx)
async def resize_store(self, new_shape: Tuple[int, ...]):
780    async def resize_store(self, new_shape: Tuple[int, ...]):
781        """
782        Resizes the store's main shard index to accommodate a new overall array shape.
783        This is a metadata-only operation on the store's root object.
784        Used when doing skeleton writes or appends via xarray where the array shape changes.
785        """
786        if self.read_only:
787            raise PermissionError("Cannot resize a read-only store.")
788        if (
789            # self._root_obj is None
790            self._chunk_shape is None
791            or self._chunks_per_shard is None
792            or self._array_shape is None
793        ):
794            raise RuntimeError("Store is not properly initialized for resizing.")
795        if len(new_shape) != len(self._array_shape):
796            raise ValueError(
797                "New shape must have the same number of dimensions as the old shape."
798            )
799
800        self._array_shape = tuple(new_shape)
801        self._chunks_per_dim = tuple(
802            math.ceil(a / c) if c > 0 else 0
803            for a, c in zip(self._array_shape, self._chunk_shape)
804        )
805        self._total_chunks = math.prod(self._chunks_per_dim)
806        old_num_shards = self._num_shards if self._num_shards is not None else 0
807        self._num_shards = (
808            (self._total_chunks + self._chunks_per_shard - 1) // self._chunks_per_shard
809            if self._total_chunks > 0
810            else 0
811        )
812        self._root_obj["chunks"]["array_shape"] = list(self._array_shape)
813        if self._num_shards > old_num_shards:
814            self._root_obj["chunks"]["shard_cids"].extend(
815                [None] * (self._num_shards - old_num_shards)
816            )
817        elif self._num_shards < old_num_shards:
818            self._root_obj["chunks"]["shard_cids"] = self._root_obj["chunks"][
819                "shard_cids"
820            ][: self._num_shards]
821
822        self._dirty_root = True

Resizes the store's main shard index to accommodate a new overall array shape. This is a metadata-only operation on the store's root object. Used when doing skeleton writes or appends via xarray where the array shape changes.

async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]):
824    async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]):
825        """
826        Resizes the Zarr metadata for a specific variable (e.g., '.json' file).
827        This does NOT change the store's main shard index.
828        """
829        if self.read_only:
830            raise PermissionError("Cannot resize a read-only store.")
831
832        zarr_metadata_key = f"{variable_name}/zarr.json"
833
834        old_zarr_metadata_cid = self._root_obj["metadata"].get(zarr_metadata_key)
835        if not old_zarr_metadata_cid:
836            raise KeyError(
837                f"Cannot find metadata for key '{zarr_metadata_key}' to resize."
838            )
839
840        old_zarr_metadata_bytes = await self.cas.load(old_zarr_metadata_cid)
841        zarr_metadata_json = json.loads(old_zarr_metadata_bytes)
842
843        zarr_metadata_json["shape"] = list(new_shape)
844
845        new_zarr_metadata_bytes = json.dumps(zarr_metadata_json, indent=2).encode(
846            "utf-8"
847        )
848        # Metadata is a raw blob of bytes
849        new_zarr_metadata_cid = await self.cas.save(
850            new_zarr_metadata_bytes, codec="raw"
851        )
852
853        self._root_obj["metadata"][zarr_metadata_key] = new_zarr_metadata_cid
854        self._dirty_root = True

Resizes the Zarr metadata for a specific variable (e.g., '.json' file). This does NOT change the store's main shard index.

async def list_dir(self, prefix: str) -> AsyncIterator[str]:
856    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
857        seen: Set[str] = set()
858        if prefix == "":
859            async for key in self.list():  # Iterates metadata keys
860                # e.g., if key is "group1/.zgroup" or "array1/.json", first_component is "group1" or "array1"
861                # if key is ".zgroup", first_component is ".zgroup"
862                first_component = key.split("/", 1)[0]
863                if first_component not in seen:
864                    seen.add(first_component)
865                    yield first_component
866        else:
867            raise NotImplementedError("Listing with a prefix is not implemented yet.")

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters

prefix : str

Returns

AsyncIterator[str]

async def convert_hamt_to_sharded( cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int) -> str:
15async def convert_hamt_to_sharded(
16    cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int
17) -> str:
18    """
19    Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore.
20
21    Args:
22        cas: An initialized ContentAddressedStore instance (KuboCAS).
23        hamt_root_cid: The root CID of the source ZarrHAMTStore.
24        chunks_per_shard: The number of chunks to group into a single shard in the new store.
25
26    Returns:
27        The root CID of the newly created ShardedZarrStore.
28    """
29    print(f"--- Starting Conversion from HAMT Root {hamt_root_cid} ---")
30    start_time = time.perf_counter()
31    # 1. Open the source HAMT store for reading
32    print("Opening source HAMT store...")
33    hamt_ro = await HAMT.build(
34        cas=cas, root_node_id=hamt_root_cid, values_are_bytes=True, read_only=True
35    )
36    source_store = ZarrHAMTStore(hamt_ro, read_only=True)
37    source_dataset = xr.open_zarr(store=source_store, consolidated=True)
38    # 2. Introspect the source array to get its configuration
39    print("Reading metadata from source store...")
40
41    # Read the stores metadata to get array shape and chunk shape
42    data_var_name = next(iter(source_dataset.data_vars))
43    ordered_dims = list(source_dataset[data_var_name].dims)
44    array_shape_tuple = tuple(source_dataset.sizes[dim] for dim in ordered_dims)
45    chunk_shape_tuple = tuple(source_dataset.chunks[dim][0] for dim in ordered_dims)
46    array_shape = array_shape_tuple
47    chunk_shape = chunk_shape_tuple
48
49    # 3. Create the destination ShardedZarrStore for writing
50    print(
51        f"Initializing new ShardedZarrStore with {chunks_per_shard} chunks per shard..."
52    )
53    dest_store = await ShardedZarrStore.open(
54        cas=cas,
55        read_only=False,
56        array_shape=array_shape,
57        chunk_shape=chunk_shape,
58        chunks_per_shard=chunks_per_shard,
59    )
60
61    print("Destination store initialized.")
62
63    # 4. Iterate and copy all data from source to destination
64    print("Starting data migration...")
65    count = 0
66    async for key in hamt_ro.keys():
67        count += 1
68        # Read the raw data (metadata or chunk) from the source
69        cid: CID = await hamt_ro.get_pointer(key)
70        cid_base32_str = str(cid.encode("base32"))
71
72        # Write the exact same key-value pair to the destination.
73        await dest_store.set_pointer(key, cid_base32_str)
74        if count % 200 == 0:  # pragma: no cover
75            print(f"Migrated {count} keys...")  # pragma: no cover
76
77    print(f"Migration of {count} total keys complete.")
78
79    # 5. Finalize the new store by flushing it to the CAS
80    print("Flushing new store to get final root CID...")
81    new_root_cid = await dest_store.flush()
82    end_time = time.perf_counter()
83
84    print("\n--- Conversion Complete! ---")
85    print(f"Total time: {end_time - start_time:.2f} seconds")
86    print(f"New ShardedZarrStore Root CID: {new_root_cid}")
87    return new_root_cid

Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore.

Args: cas: An initialized ContentAddressedStore instance (KuboCAS). hamt_root_cid: The root CID of the source ZarrHAMTStore. chunks_per_shard: The number of chunks to group into a single shard in the new store.

Returns: The root CID of the newly created ShardedZarrStore.

async def sharded_converter_cli():
 90async def sharded_converter_cli():
 91    parser = argparse.ArgumentParser(
 92        description="Convert a Zarr HAMT store to a Sharded Zarr store."
 93    )
 94    parser.add_argument(
 95        "hamt_cid", type=str, help="The root CID of the source Zarr HAMT store."
 96    )
 97    parser.add_argument(
 98        "--chunks-per-shard",
 99        type=int,
100        default=6250,
101        help="Number of chunk CIDs to store per shard in the new store.",
102    )
103    parser.add_argument(
104        "--rpc-url",
105        type=str,
106        default="http://127.0.0.1:5001",
107        help="The URL of the IPFS Kubo RPC API.",
108    )
109    parser.add_argument(
110        "--gateway-url",
111        type=str,
112        default="http://127.0.0.1:8080",
113        help="The URL of the IPFS Gateway.",
114    )
115    args = parser.parse_args()
116    # Initialize the KuboCAS client with the provided RPC and Gateway URLs
117    async with KuboCAS(
118        rpc_base_url=args.rpc_url, gateway_base_url=args.gateway_url
119    ) as cas_client:
120        try:
121            await convert_hamt_to_sharded(
122                cas=cas_client,
123                hamt_root_cid=args.hamt_cid,
124                chunks_per_shard=args.chunks_per_shard,
125            )
126        except Exception as e:
127            print(f"\nAn error occurred: {e}")