py_hamt
1from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore 2from .hamt import HAMT, blake3_hashfn 3from .hamt_to_sharded_converter import convert_hamt_to_sharded, sharded_converter_cli 4from .sharded_zarr_store import ShardedZarrStore 5from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS 6from .zarr_hamt_store import ZarrHAMTStore 7 8__all__ = [ 9 "blake3_hashfn", 10 "HAMT", 11 "ContentAddressedStore", 12 "InMemoryCAS", 13 "KuboCAS", 14 "ZarrHAMTStore", 15 "SimpleEncryptedZarrHAMTStore", 16 "ShardedZarrStore", 17 "convert_hamt_to_sharded", 18 "sharded_converter_cli", 19]
55def blake3_hashfn(input_bytes: bytes) -> bytes: 56 """ 57 This is the default blake3 hash function used for the `HAMT`, with a 32 byte hash size. 58 59 """ 60 # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify 61 digest: bytes = b3.digest(input_bytes, size=32) 62 raw_bytes: bytes = b3.unwrap(digest) 63 return raw_bytes
This is the default blake3 hash function used for the HAMT, with a 32 byte hash size.
303class HAMT: 304 """ 305 An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model. 306 307 Use this to store arbitrarily large key-value mappings in your CAS of choice. 308 309 For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread. 310 311 When in read-only mode, the HAMT is both async and thread safe. 312 313 #### A note about memory management, read+write and read-only modes 314 The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations. 315 316 Note that in read+write, the real root node id IS NOT VALID. You should call `make_read_only()` to convert to read only mode and then read `root_node_id`. 317 318 These optimizations also trade off performance for memory use. Use `cache_size` to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use `cache_vacate` if you are over your memory limits. 319 320 #### IPFS HAMT Sample Code 321 ```python 322 kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints 323 hamt = await HAMT.build(cas=kubo_cas) 324 await hamt.set("foo", "bar") 325 assert (await hamt.get("foo")) == "bar" 326 await hamt.make_read_only() 327 cid = hamt.root_node_id # our root node CID 328 print(cid) 329 ``` 330 """ 331 332 def __init__( 333 self, 334 cas: ContentAddressedStore, 335 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 336 root_node_id: IPLDKind | None = None, 337 read_only: bool = False, 338 max_bucket_size: int = 4, 339 values_are_bytes: bool = False, 340 ): 341 """ 342 Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below. 343 """ 344 345 self.cas: ContentAddressedStore = cas 346 """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS.""" 347 348 self.hash_fn: Callable[[bytes], bytes] = hash_fn 349 """ 350 This is the hash function used to place a key-value within the HAMT. 351 352 To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes. 353 354 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 355 356 Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors. 357 """ 358 359 self.lock: asyncio.Lock = asyncio.Lock() 360 """@private""" 361 362 self.values_are_bytes: bool = values_are_bytes 363 """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind. 364 365 This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk. 366 """ 367 368 if max_bucket_size < 1: 369 raise ValueError("Bucket size maximum must be a positive integer") 370 self.max_bucket_size: int = max_bucket_size 371 """ 372 This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided. 373 374 This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS. 375 376 This must be a positive integer with a minimum of 1. 377 """ 378 379 self.root_node_id: IPLDKind = root_node_id 380 """ 381 This is type IPLDKind but the documentation generator pdoc mangles it a bit. 382 383 Read from this only when in read mode to get something valid! 384 """ 385 386 self.read_only: bool = read_only 387 """Clients should NOT modify this. 388 389 This is here for checking whether the HAMT is in read only or read/write mode. 390 391 The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS. 392 """ 393 self.node_store: NodeStore 394 """@private""" 395 if read_only: 396 self.node_store = ReadCacheStore(self) 397 else: 398 self.node_store = InMemoryTreeStore(self) 399 400 @classmethod 401 async def build(cls, *args: Any, **kwargs: Any) -> "HAMT": 402 """ 403 Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__. 404 405 This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided. 406 """ 407 hamt = cls(*args, **kwargs) 408 if hamt.root_node_id is None: 409 hamt.root_node_id = await hamt.node_store.save(None, Node()) 410 return hamt 411 412 # This is typically a massive blocking operation, you dont want to be running this concurrently with a bunch of other operations, so it's ok to have it not be async 413 async def make_read_only(self) -> None: 414 """ 415 Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID. 416 417 In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between. 418 """ 419 async with self.lock: 420 inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store) 421 await inmemory_tree.vacate() 422 423 self.read_only = True 424 self.node_store = ReadCacheStore(self) 425 426 async def enable_write(self) -> None: 427 """ 428 Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`. 429 """ 430 async with self.lock: 431 # The read cache has no writes that need to be sent upstream so we can remove it without vacating 432 self.read_only = False 433 self.node_store = InMemoryTreeStore(self) 434 435 async def cache_size(self) -> int: 436 """ 437 Returns the memory used by some internal performance optimization tools in bytes. 438 439 This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however. 440 441 Be warned that this may take a while to run for large HAMTs. 442 443 For more on memory management, see the `HAMT` class documentation. 444 """ 445 if self.read_only: 446 return self.node_store.size() 447 async with self.lock: 448 return self.node_store.size() 449 450 async def cache_vacate(self) -> None: 451 """ 452 Vacate and completely empty out the internal read/write cache. 453 454 Be warned that this may take a while if there have been a lot of write operations. 455 456 For more on memory management, see the `HAMT` class documentation. 457 """ 458 if self.read_only: 459 await self.node_store.vacate() 460 else: 461 async with self.lock: 462 await self.node_store.vacate() 463 464 async def _reserialize_and_link( 465 self, node_stack: list[tuple[IPLDKind, Node]] 466 ) -> None: 467 """ 468 This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store 469 Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack 470 Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python 471 If a node ends up being empty, then it is deleted entirely, unless it is the root node 472 Modifies in place 473 """ 474 # iterate in the reverse direction, this range goes from n-1 to 0, from the bottommost tree node to the root 475 for stack_index in range(len(node_stack) - 1, -1, -1): 476 old_id, node = node_stack[stack_index] 477 478 # If this node is empty, and it's not the root node, then we can delete it entirely from the list 479 is_root: bool = stack_index == 0 480 if node.is_empty() and not is_root: 481 # Unlink from the rest of the tree 482 _, prev_node = node_stack[stack_index - 1] 483 # When removing links, don't worry about two nodes having the same link since all nodes are guaranteed to be different by the removal of empty nodes after every single operation 484 for link_index in prev_node.iter_link_indices(): 485 link = prev_node.get_link(link_index) 486 if link == old_id: 487 # Delete the link by making it an empty bucket 488 prev_node.data[link_index] = {} 489 break 490 491 # Remove from our stack, continue reserializing up the tree 492 node_stack.pop(stack_index) 493 continue 494 495 # If not an empty node, just reserialize like normal and replace this one 496 new_store_id: IPLDKind = await self.node_store.save(old_id, node) 497 node_stack[stack_index] = (new_store_id, node) 498 499 # If this is not the last i.e. root node, we need to change the linking of the node prior in the list since we just reserialized 500 if not is_root: 501 _, prev_node = node_stack[stack_index - 1] 502 prev_node.replace_link(old_id, new_store_id) 503 504 # automatically skip encoding if the value provided is of the bytes variety 505 async def set(self, key: str, val: IPLDKind) -> None: 506 """Write a key-value mapping.""" 507 if self.read_only: 508 raise Exception("Cannot call set on a read only HAMT") 509 510 data: bytes 511 if self.values_are_bytes: 512 data = cast( 513 bytes, val 514 ) # let users get an exception if they pass in a non bytes when they want to skip encoding 515 else: 516 data = dag_cbor.encode(val) 517 518 pointer: IPLDKind = await self.cas.save(data, codec="raw") 519 await self._set_pointer(key, pointer) 520 521 async def _set_pointer(self, key: str, val_ptr: IPLDKind) -> None: 522 async with self.lock: 523 node_stack: list[tuple[IPLDKind, Node]] = [] 524 root_node: Node = await self.node_store.load(self.root_node_id) 525 node_stack.append((self.root_node_id, root_node)) 526 527 # FIFO queue to keep track of all the KVs we need to insert 528 # This is needed if any buckets overflow and so we need to reinsert all those KVs 529 kvs_queue: list[tuple[str, IPLDKind]] = [] 530 kvs_queue.append((key, val_ptr)) 531 532 while len(kvs_queue) > 0: 533 _, top_node = node_stack[-1] 534 curr_key, curr_val_ptr = kvs_queue[0] 535 536 raw_hash: bytes = self.hash_fn(curr_key.encode()) 537 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 538 539 item = top_node.data[map_key] 540 if isinstance(item, list): 541 next_node_id: IPLDKind = item[0] 542 next_node: Node = await self.node_store.load(next_node_id) 543 node_stack.append((next_node_id, next_node)) 544 elif isinstance(item, dict): 545 bucket: dict[str, IPLDKind] = item 546 547 # If this bucket already has this same key, or has space, just rewrite the value and then go work on the others in the queue 548 if curr_key in bucket or len(bucket) < self.max_bucket_size: 549 bucket[curr_key] = curr_val_ptr 550 kvs_queue.pop(0) 551 continue 552 553 # The current key is not in the bucket and the bucket is too full, so empty KVs from the bucket and restart insertion 554 for k in bucket: 555 v_ptr = bucket[k] 556 kvs_queue.append((k, v_ptr)) 557 558 # Create a new link to a new node so that we can reflow these KVs into a new subtree 559 new_node = Node() 560 new_node_id: IPLDKind = await self.node_store.save(None, new_node) 561 link: list[IPLDKind] = [new_node_id] 562 top_node.data[map_key] = link 563 564 # Finally, reserialize and fix all links, deleting empty nodes as needed 565 await self._reserialize_and_link(node_stack) 566 self.root_node_id = node_stack[0][0] 567 568 async def delete(self, key: str) -> None: 569 """Delete a key-value mapping.""" 570 571 # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo 572 if self.read_only: 573 raise Exception("Cannot call delete on a read only HAMT") 574 575 async with self.lock: 576 raw_hash: bytes = self.hash_fn(key.encode()) 577 578 node_stack: list[tuple[IPLDKind, Node]] = [] 579 root_node: Node = await self.node_store.load(self.root_node_id) 580 node_stack.append((self.root_node_id, root_node)) 581 582 created_change: bool = False 583 while True: 584 _, top_node = node_stack[-1] 585 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 586 587 item = top_node.data[map_key] 588 if isinstance(item, dict): 589 bucket = item 590 if key in bucket: 591 del bucket[key] 592 created_change = True 593 # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError 594 break 595 elif isinstance(item, list): 596 link: IPLDKind = item[0] 597 next_node: Node = await self.node_store.load(link) 598 node_stack.append((link, next_node)) 599 600 # Finally, reserialize and fix all links, deleting empty nodes as needed 601 if created_change: 602 await self._reserialize_and_link(node_stack) 603 self.root_node_id = node_stack[0][0] 604 else: 605 # If we didn't make a change, then this key must not exist within the HAMT 606 raise KeyError 607 608 async def get( 609 self, 610 key: str, 611 offset: Optional[int] = None, 612 length: Optional[int] = None, 613 suffix: Optional[int] = None, 614 ) -> IPLDKind: 615 """Get a value.""" 616 pointer: IPLDKind = await self.get_pointer(key) 617 data: bytes = await self.cas.load( 618 pointer, offset=offset, length=length, suffix=suffix 619 ) 620 if self.values_are_bytes: 621 return data 622 else: 623 return dag_cbor.decode(data) 624 625 async def get_pointer(self, key: str) -> IPLDKind: 626 """ 627 Get a store ID that points to the value for this key. 628 629 This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example. 630 """ 631 # If read only, no need to acquire a lock 632 pointer: IPLDKind 633 if self.read_only: 634 pointer = await self._get_pointer(key) 635 else: 636 async with self.lock: 637 pointer = await self._get_pointer(key) 638 639 return pointer 640 641 # Callers MUST handle acquiring a lock 642 async def _get_pointer(self, key: str) -> IPLDKind: 643 with instrumentation.span( 644 "py_hamt.hamt.lookup", {"py_hamt.hamt.lookup.key": key} 645 ): 646 lookup_started_at = time.perf_counter() 647 raw_hash: bytes = self.hash_fn(key.encode()) 648 649 current_id: IPLDKind = self.root_node_id 650 current_depth: int = 0 651 node_loads = 0 652 node_cache_hits = 0 653 654 # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value of IPLDKind 655 result_ptr: IPLDKind = None 656 found_a_result: bool = False 657 try: 658 while True: 659 top_id: IPLDKind = current_id 660 if ( 661 isinstance(self.node_store, ReadCacheStore) 662 and top_id in self.node_store.cache 663 ): 664 node_cache_hits += 1 665 node_loads += 1 666 top_node: Node = await self.node_store.load(top_id) 667 map_key: int = extract_bits(raw_hash, current_depth, 8) 668 669 # Check if this key is in one of the buckets 670 item = top_node.data[map_key] 671 if isinstance(item, dict): 672 bucket = item 673 if key in bucket: 674 result_ptr = bucket[key] 675 found_a_result = True 676 break 677 678 if isinstance(item, list): 679 link: IPLDKind = item[0] 680 current_id = link 681 current_depth += 1 682 continue 683 684 # Nowhere left to go, stop walking down the tree 685 break 686 687 if not found_a_result: 688 raise KeyError 689 690 return result_ptr 691 finally: 692 instrumentation.record_hamt_lookup( 693 key, 694 depth=current_depth, 695 node_loads=node_loads, 696 node_cache_hits=node_cache_hits, 697 found=found_a_result, 698 seconds=time.perf_counter() - lookup_started_at, 699 ) 700 701 # Callers MUST handle locking or not on their own 702 async def _iter_nodes(self) -> AsyncIterator[tuple[IPLDKind, Node]]: 703 node_id_stack: list[IPLDKind] = [self.root_node_id] 704 while len(node_id_stack) > 0: 705 top_id: IPLDKind = node_id_stack.pop() 706 node: Node = await self.node_store.load(top_id) 707 yield (top_id, node) 708 node_id_stack.extend(list(node.iter_links())) 709 710 async def keys(self) -> AsyncIterator[str]: 711 """ 712 AsyncIterator returning all keys in the HAMT. 713 714 If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed. 715 716 When the HAMT is in read only mode however, this can be run concurrently with get operations. 717 """ 718 if self.read_only: 719 async for k in self._keys_no_locking(): 720 yield k 721 else: 722 async with self.lock: 723 async for k in self._keys_no_locking(): 724 yield k 725 726 async def _keys_no_locking(self) -> AsyncIterator[str]: 727 async for _, node in self._iter_nodes(): 728 for bucket in node.iter_buckets(): 729 for key in bucket: 730 yield key 731 732 async def len(self) -> int: 733 """ 734 Return the number of key value mappings in this HAMT. 735 736 When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations. 737 """ 738 count: int = 0 739 async for _ in self.keys(): 740 count += 1 741 742 return count
An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.
Use this to store arbitrarily large key-value mappings in your CAS of choice.
For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.
When in read-only mode, the HAMT is both async and thread safe.
A note about memory management, read+write and read-only modes
The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.
Note that in read+write, the real root node id IS NOT VALID. You should call make_read_only() to convert to read only mode and then read root_node_id.
These optimizations also trade off performance for memory use. Use cache_size to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use cache_vacate if you are over your memory limits.
IPFS HAMT Sample Code
kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
hamt = await HAMT.build(cas=kubo_cas)
await hamt.set("foo", "bar")
assert (await hamt.get("foo")) == "bar"
await hamt.make_read_only()
cid = hamt.root_node_id # our root node CID
print(cid)
332 def __init__( 333 self, 334 cas: ContentAddressedStore, 335 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 336 root_node_id: IPLDKind | None = None, 337 read_only: bool = False, 338 max_bucket_size: int = 4, 339 values_are_bytes: bool = False, 340 ): 341 """ 342 Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below. 343 """ 344 345 self.cas: ContentAddressedStore = cas 346 """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS.""" 347 348 self.hash_fn: Callable[[bytes], bytes] = hash_fn 349 """ 350 This is the hash function used to place a key-value within the HAMT. 351 352 To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes. 353 354 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 355 356 Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors. 357 """ 358 359 self.lock: asyncio.Lock = asyncio.Lock() 360 """@private""" 361 362 self.values_are_bytes: bool = values_are_bytes 363 """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind. 364 365 This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk. 366 """ 367 368 if max_bucket_size < 1: 369 raise ValueError("Bucket size maximum must be a positive integer") 370 self.max_bucket_size: int = max_bucket_size 371 """ 372 This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided. 373 374 This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS. 375 376 This must be a positive integer with a minimum of 1. 377 """ 378 379 self.root_node_id: IPLDKind = root_node_id 380 """ 381 This is type IPLDKind but the documentation generator pdoc mangles it a bit. 382 383 Read from this only when in read mode to get something valid! 384 """ 385 386 self.read_only: bool = read_only 387 """Clients should NOT modify this. 388 389 This is here for checking whether the HAMT is in read only or read/write mode. 390 391 The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS. 392 """ 393 self.node_store: NodeStore 394 """@private""" 395 if read_only: 396 self.node_store = ReadCacheStore(self) 397 else: 398 self.node_store = InMemoryTreeStore(self)
Use build if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
The backing storage system. py-hamt provides an implementation KuboCAS for IPFS.
This is the hash function used to place a key-value within the HAMT.
To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
This must be a positive integer with a minimum of 1.
This is type IPLDKind but the documentation generator pdoc mangles it a bit.
Read from this only when in read mode to get something valid!
Clients should NOT modify this.
This is here for checking whether the HAMT is in read only or read/write mode.
The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
400 @classmethod 401 async def build(cls, *args: Any, **kwargs: Any) -> "HAMT": 402 """ 403 Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__. 404 405 This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided. 406 """ 407 hamt = cls(*args, **kwargs) 408 if hamt.root_node_id is None: 409 hamt.root_node_id = await hamt.node_store.save(None, Node()) 410 return hamt
Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as __init__. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
413 async def make_read_only(self) -> None: 414 """ 415 Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID. 416 417 In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between. 418 """ 419 async with self.lock: 420 inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store) 421 await inmemory_tree.vacate() 422 423 self.read_only = True 424 self.node_store = ReadCacheStore(self)
Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
426 async def enable_write(self) -> None: 427 """ 428 Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`. 429 """ 430 async with self.lock: 431 # The read cache has no writes that need to be sent upstream so we can remove it without vacating 432 self.read_only = False 433 self.node_store = InMemoryTreeStore(self)
Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use make_read_only.
435 async def cache_size(self) -> int: 436 """ 437 Returns the memory used by some internal performance optimization tools in bytes. 438 439 This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however. 440 441 Be warned that this may take a while to run for large HAMTs. 442 443 For more on memory management, see the `HAMT` class documentation. 444 """ 445 if self.read_only: 446 return self.node_store.size() 447 async with self.lock: 448 return self.node_store.size()
Returns the memory used by some internal performance optimization tools in bytes.
This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
Be warned that this may take a while to run for large HAMTs.
For more on memory management, see the HAMT class documentation.
450 async def cache_vacate(self) -> None: 451 """ 452 Vacate and completely empty out the internal read/write cache. 453 454 Be warned that this may take a while if there have been a lot of write operations. 455 456 For more on memory management, see the `HAMT` class documentation. 457 """ 458 if self.read_only: 459 await self.node_store.vacate() 460 else: 461 async with self.lock: 462 await self.node_store.vacate()
Vacate and completely empty out the internal read/write cache.
Be warned that this may take a while if there have been a lot of write operations.
For more on memory management, see the HAMT class documentation.
505 async def set(self, key: str, val: IPLDKind) -> None: 506 """Write a key-value mapping.""" 507 if self.read_only: 508 raise Exception("Cannot call set on a read only HAMT") 509 510 data: bytes 511 if self.values_are_bytes: 512 data = cast( 513 bytes, val 514 ) # let users get an exception if they pass in a non bytes when they want to skip encoding 515 else: 516 data = dag_cbor.encode(val) 517 518 pointer: IPLDKind = await self.cas.save(data, codec="raw") 519 await self._set_pointer(key, pointer)
Write a key-value mapping.
568 async def delete(self, key: str) -> None: 569 """Delete a key-value mapping.""" 570 571 # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo 572 if self.read_only: 573 raise Exception("Cannot call delete on a read only HAMT") 574 575 async with self.lock: 576 raw_hash: bytes = self.hash_fn(key.encode()) 577 578 node_stack: list[tuple[IPLDKind, Node]] = [] 579 root_node: Node = await self.node_store.load(self.root_node_id) 580 node_stack.append((self.root_node_id, root_node)) 581 582 created_change: bool = False 583 while True: 584 _, top_node = node_stack[-1] 585 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 586 587 item = top_node.data[map_key] 588 if isinstance(item, dict): 589 bucket = item 590 if key in bucket: 591 del bucket[key] 592 created_change = True 593 # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError 594 break 595 elif isinstance(item, list): 596 link: IPLDKind = item[0] 597 next_node: Node = await self.node_store.load(link) 598 node_stack.append((link, next_node)) 599 600 # Finally, reserialize and fix all links, deleting empty nodes as needed 601 if created_change: 602 await self._reserialize_and_link(node_stack) 603 self.root_node_id = node_stack[0][0] 604 else: 605 # If we didn't make a change, then this key must not exist within the HAMT 606 raise KeyError
Delete a key-value mapping.
608 async def get( 609 self, 610 key: str, 611 offset: Optional[int] = None, 612 length: Optional[int] = None, 613 suffix: Optional[int] = None, 614 ) -> IPLDKind: 615 """Get a value.""" 616 pointer: IPLDKind = await self.get_pointer(key) 617 data: bytes = await self.cas.load( 618 pointer, offset=offset, length=length, suffix=suffix 619 ) 620 if self.values_are_bytes: 621 return data 622 else: 623 return dag_cbor.decode(data)
Get a value.
625 async def get_pointer(self, key: str) -> IPLDKind: 626 """ 627 Get a store ID that points to the value for this key. 628 629 This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example. 630 """ 631 # If read only, no need to acquire a lock 632 pointer: IPLDKind 633 if self.read_only: 634 pointer = await self._get_pointer(key) 635 else: 636 async with self.lock: 637 pointer = await self._get_pointer(key) 638 639 return pointer
Get a store ID that points to the value for this key.
This is useful for some applications that want to implement a read cache. Due to the restrictions of ContentAddressedStore on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in ZarrHAMTStore for example.
710 async def keys(self) -> AsyncIterator[str]: 711 """ 712 AsyncIterator returning all keys in the HAMT. 713 714 If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed. 715 716 When the HAMT is in read only mode however, this can be run concurrently with get operations. 717 """ 718 if self.read_only: 719 async for k in self._keys_no_locking(): 720 yield k 721 else: 722 async with self.lock: 723 async for k in self._keys_no_locking(): 724 yield k
AsyncIterator returning all keys in the HAMT.
If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
When the HAMT is in read only mode however, this can be run concurrently with get operations.
732 async def len(self) -> int: 733 """ 734 Return the number of key value mappings in this HAMT. 735 736 When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations. 737 """ 738 count: int = 0 739 async for _ in self.keys(): 740 count += 1 741 742 return count
Return the number of key value mappings in this HAMT.
When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
16class ContentAddressedStore(ABC): 17 """ 18 Abstract class that represents a content addressed storage that the `HAMT` can use for keeping data. 19 20 Note that the return type of save and input to load is really type `IPLDKind`, but the documentation generator pdoc mangles it unfortunately. 21 22 #### A note on the IPLDKind return types 23 Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions: 24 1. No lists or dicts, since python does not classify these as immutable. 25 2. No `None` values since this is used in HAMT's `__init__` to indicate that an empty HAMT needs to be initialized. 26 """ 27 28 CodecInput = Literal["raw", "dag-cbor"] 29 30 @abstractmethod 31 async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: 32 """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type. 33 34 `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model. 35 """ 36 37 @abstractmethod 38 async def load( 39 self, 40 id: IPLDKind, 41 offset: Optional[int] = None, 42 length: Optional[int] = None, 43 suffix: Optional[int] = None, 44 ) -> bytes: 45 """Retrieve data.""" 46 47 async def pin_cid(self, id: IPLDKind, target_rpc: str) -> None: 48 """Pin a CID in the storage.""" 49 pass # pragma: no cover 50 51 async def unpin_cid(self, id: IPLDKind, target_rpc: str) -> None: 52 """Unpin a CID in the storage.""" 53 pass # pragma: no cover 54 55 async def pin_update( 56 self, old_id: IPLDKind, new_id: IPLDKind, target_rpc: str 57 ) -> None: 58 """Update the pinned CID in the storage.""" 59 pass # pragma: no cover 60 61 async def pin_ls(self, target_rpc: str) -> list[Dict[str, Any]]: 62 """List all pinned CIDs in the storage.""" 63 return [] # pragma: no cover
Abstract class that represents a content addressed storage that the HAMT can use for keeping data.
Note that the return type of save and input to load is really type IPLDKind, but the documentation generator pdoc mangles it unfortunately.
A note on the IPLDKind return types
Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:
- No lists or dicts, since python does not classify these as immutable.
- No
Nonevalues since this is used in HAMT's__init__to indicate that an empty HAMT needs to be initialized.
30 @abstractmethod 31 async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: 32 """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type. 33 34 `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model. 35 """
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
37 @abstractmethod 38 async def load( 39 self, 40 id: IPLDKind, 41 offset: Optional[int] = None, 42 length: Optional[int] = None, 43 suffix: Optional[int] = None, 44 ) -> bytes: 45 """Retrieve data."""
Retrieve data.
47 async def pin_cid(self, id: IPLDKind, target_rpc: str) -> None: 48 """Pin a CID in the storage.""" 49 pass # pragma: no cover
Pin a CID in the storage.
51 async def unpin_cid(self, id: IPLDKind, target_rpc: str) -> None: 52 """Unpin a CID in the storage.""" 53 pass # pragma: no cover
Unpin a CID in the storage.
55 async def pin_update( 56 self, old_id: IPLDKind, new_id: IPLDKind, target_rpc: str 57 ) -> None: 58 """Update the pinned CID in the storage.""" 59 pass # pragma: no cover
Update the pinned CID in the storage.
66class InMemoryCAS(ContentAddressedStore): 67 """Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that `save` returns and `load` takes in.""" 68 69 store: dict[bytes, bytes] 70 hash_alg: Multihash 71 72 def __init__(self): 73 self.store = dict() 74 self.hash_alg = multihash.get("blake3") 75 76 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes: 77 hash: bytes = self.hash_alg.digest(data, size=32) 78 self.store[hash] = data 79 return hash 80 81 async def load( 82 self, 83 id: IPLDKind, 84 offset: Optional[int] = None, 85 length: Optional[int] = None, 86 suffix: Optional[int] = None, 87 ) -> bytes: 88 """ 89 `ContentAddressedStore` allows any IPLD scalar key. For the in-memory 90 backend we *require* a `bytes` hash; anything else is rejected at run 91 time. In OO type-checking, a subclass may widen (make more general) argument types, 92 but it must never narrow them; otherwise callers that expect the base-class contract can break. 93 Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. 94 This is why we use `cast` here, to tell mypy that we know what we are doing. 95 h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch 96 """ 97 key = cast(bytes, id) 98 if not isinstance(key, (bytes, bytearray)): # defensive guard 99 raise TypeError( 100 f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" 101 ) 102 data: bytes 103 try: 104 data = self.store[key] 105 except KeyError as exc: 106 raise KeyError("Object not found in in-memory store") from exc 107 108 if offset is not None: 109 start = offset 110 if length is not None: 111 end = start + length 112 return data[start:end] 113 else: 114 return data[start:] 115 elif suffix is not None: # If only length is given, assume start from 0 116 return data[-suffix:] 117 else: # Full load 118 return data
Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that save returns and load takes in.
76 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes: 77 hash: bytes = self.hash_alg.digest(data, size=32) 78 self.store[hash] = data 79 return hash
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
81 async def load( 82 self, 83 id: IPLDKind, 84 offset: Optional[int] = None, 85 length: Optional[int] = None, 86 suffix: Optional[int] = None, 87 ) -> bytes: 88 """ 89 `ContentAddressedStore` allows any IPLD scalar key. For the in-memory 90 backend we *require* a `bytes` hash; anything else is rejected at run 91 time. In OO type-checking, a subclass may widen (make more general) argument types, 92 but it must never narrow them; otherwise callers that expect the base-class contract can break. 93 Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. 94 This is why we use `cast` here, to tell mypy that we know what we are doing. 95 h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch 96 """ 97 key = cast(bytes, id) 98 if not isinstance(key, (bytes, bytearray)): # defensive guard 99 raise TypeError( 100 f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" 101 ) 102 data: bytes 103 try: 104 data = self.store[key] 105 except KeyError as exc: 106 raise KeyError("Object not found in in-memory store") from exc 107 108 if offset is not None: 109 start = offset 110 if length is not None: 111 end = start + length 112 return data[start:end] 113 else: 114 return data[start:] 115 elif suffix is not None: # If only length is given, assume start from 0 116 return data[-suffix:] 117 else: # Full load 118 return data
ContentAddressedStore allows any IPLD scalar key. For the in-memory
backend we require a bytes hash; anything else is rejected at run
time. In OO type-checking, a subclass may widen (make more general) argument types,
but it must never narrow them; otherwise callers that expect the base-class contract can break.
Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
This is why we use cast here, to tell mypy that we know what we are doing.
h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
121class KuboCAS(ContentAddressedStore): 122 """ 123 Connects to an **IPFS Kubo** daemon. 124 125 The IDs in save and load are IPLD CIDs. 126 127 * **save()** → RPC (`/api/v0/add`) 128 * **load()** → HTTP gateway (`/ipfs/{cid}`) 129 130 `save` uses the RPC API and `load` uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use. 131 132 ### Authentication / custom headers 133 You have two options: 134 135 1. **Bring your own `httpx.AsyncClient`** 136 Pass it via `client=...` — any default headers or auth 137 configured on that client are reused for **every** request. 138 2. **Let `KuboCAS` build the client** but pass 139 `headers=` *and*/or `auth=` kwargs; they are forwarded to the 140 internally–created `AsyncClient`. 141 142 ```python 143 import httpx 144 from py_hamt import KuboCAS 145 146 # Option 1: user-supplied client 147 client = httpx.AsyncClient( 148 headers={"Authorization": "Bearer <token>"}, 149 auth=("user", "pass"), 150 ) 151 cas = KuboCAS(client=client) 152 153 # Option 2: let KuboCAS create the client 154 cas = KuboCAS( 155 headers={"X-My-Header": "yes"}, 156 auth=("user", "pass"), 157 ) 158 ``` 159 160 ### Parameters 161 - **hasher** (str): multihash name (defaults to *blake3*). 162 - **client** (`httpx.AsyncClient | None`): reuse an existing 163 client; if *None* KuboCAS will create one lazily. 164 - **headers** (dict[str, str] | None): default headers for the 165 internally-created client. 166 - **auth** (`tuple[str, str] | None`): authentication tuple (username, password) 167 for the internally-created client. 168 - **rpc_base_url / gateway_base_url** (str | None): override daemon 169 endpoints (defaults match the local daemon ports). 170 - **chunker** (str): chunking algorithm specification for Kubo's `add` 171 RPC. Accepted formats are `"size-<positive int>"`, `"rabin"`, or 172 `"rabin-<min>-<avg>-<max>"`. 173 174 ... 175 """ 176 177 KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = "http://127.0.0.1:8080" 178 KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = "http://127.0.0.1:5001" 179 180 DAG_PB_MARKER: int = 0x70 181 """@private""" 182 183 # Take in a httpx client that can be reused across POSTs and GETs to a specific IPFS daemon 184 def __init__( 185 self, 186 hasher: str = "blake3", 187 client: httpx.AsyncClient | None = None, 188 rpc_base_url: str | None = None, 189 gateway_base_url: str | None = None, 190 concurrency: int = 32, 191 *, 192 headers: dict[str, str] | None = None, 193 auth: Tuple[str, str] | None = None, 194 pin_on_add: bool = False, 195 chunker: str = "size-1048576", 196 max_retries: int = 3, 197 initial_delay: float = 1.0, 198 backoff_factor: float = 2.0, 199 ): 200 """ 201 If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all. 202 203 ### `httpx.AsyncClient` Management 204 If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()` 205 as a class instance cannot know when it will no longer be in use, unless explicitly told to do so. 206 207 If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below: 208 ```python 209 async with httpx.AsyncClient() as client, KuboCAS( 210 rpc_base_url=rpc_base_url, 211 gateway_base_url=gateway_base_url, 212 client=client, 213 ) as kubo_cas: 214 hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) 215 zhs = ZarrHAMTStore(hamt) 216 # Use the KuboCAS instance as needed 217 # ... 218 ``` 219 As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up. 220 ``` python 221 cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url) 222 # Use the KuboCAS instance as needed 223 # ... 224 await cas.aclose() # Ensure resources are cleaned up 225 ``` 226 227 ### Authenticated RPC/Gateway Access 228 Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in. 229 Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided. 230 If you do not need authentication, you can leave these parameters as `None`. 231 232 ### RPC and HTTP Gateway Base URLs 233 These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided. 234 """ 235 236 self._owns_client: bool = False 237 self._closed: bool = True 238 self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {} 239 self._default_headers = headers 240 self._default_auth = auth 241 242 # Now, perform validation that might raise an exception 243 chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)" 244 if re.fullmatch(chunker_pattern, chunker) is None: 245 raise ValueError("Invalid chunker specification") 246 self.chunker: str = chunker 247 248 self.hasher: str = hasher 249 """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.""" 250 251 if rpc_base_url is None: 252 rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma 253 if gateway_base_url is None: 254 gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL 255 256 if "/ipfs/" in gateway_base_url: 257 gateway_base_url = gateway_base_url.split("/ipfs/")[0] 258 259 # Standard gateway URL construction with proper path handling 260 if gateway_base_url.endswith("/"): 261 gateway_base_url = f"{gateway_base_url}ipfs/" 262 else: 263 gateway_base_url = f"{gateway_base_url}/ipfs/" 264 265 pin_string: str = "true" if pin_on_add else "false" 266 self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin={pin_string}" 267 """@private""" 268 self.gateway_base_url: str = gateway_base_url 269 """@private""" 270 271 if client is not None: 272 # A client was supplied by the user. We don't own it. 273 self._owns_client = False 274 self._client_per_loop = {asyncio.get_running_loop(): client} 275 else: 276 # No client supplied. We will own any clients we create. 277 self._owns_client = True 278 self._client_per_loop = {} 279 280 # store for later use by _loop_client() 281 self._default_headers = headers 282 self._default_auth = auth 283 284 self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency) 285 self._closed = False 286 287 # Validate retry parameters 288 if max_retries < 0: 289 raise ValueError("max_retries must be non-negative") 290 if initial_delay <= 0: 291 raise ValueError("initial_delay must be positive") 292 if backoff_factor < 1.0: 293 raise ValueError("backoff_factor must be >= 1.0 for exponential backoff") 294 295 self.max_retries = max_retries 296 self.initial_delay = initial_delay 297 self.backoff_factor = backoff_factor 298 299 # --------------------------------------------------------------------- # 300 # helper: get or create the client bound to the current running loop # 301 # --------------------------------------------------------------------- # 302 def _loop_client(self) -> httpx.AsyncClient: 303 """Get or create a client for the current event loop. 304 305 If the instance was previously closed but owns its clients, a fresh 306 client mapping is lazily created on demand. Users that supplied their 307 own ``httpx.AsyncClient`` still receive an error when the instance has 308 been closed, as we cannot safely recreate their client. 309 """ 310 if self._closed: 311 if not self._owns_client: 312 raise RuntimeError("KuboCAS is closed; create a new instance") 313 # We previously closed all internally-owned clients. Reset the 314 # state so that new clients can be created lazily. 315 self._closed = False 316 self._client_per_loop = {} 317 318 loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() 319 try: 320 return self._client_per_loop[loop] 321 except KeyError: 322 # Create a new client 323 client = httpx.AsyncClient( 324 timeout=60.0, 325 headers=self._default_headers, 326 auth=self._default_auth, 327 limits=httpx.Limits(max_connections=64, max_keepalive_connections=32), 328 # Uncomment when they finally support Robust HTTP/2 GOAWAY responses 329 # http2=True, 330 ) 331 self._client_per_loop[loop] = client 332 return client 333 334 # --------------------------------------------------------------------- # 335 # graceful shutdown: close **all** clients we own # 336 # --------------------------------------------------------------------- # 337 async def aclose(self) -> None: 338 """ 339 Closes all internally-created clients. Must be called from an async context. 340 """ 341 if self._owns_client is False: # external client → caller closes 342 return 343 344 # This method is async, so we can reliably await the async close method. 345 # The complex sync/async logic is handled by __del__. 346 for client in list(self._client_per_loop.values()): 347 if not client.is_closed: 348 try: 349 await client.aclose() 350 except Exception: 351 pass # best-effort cleanup 352 353 self._client_per_loop.clear() 354 self._closed = True 355 356 # At this point, _client_per_loop should be empty or only contain 357 # clients from loops we haven't seen (which shouldn't happen in practice) 358 async def __aenter__(self) -> "KuboCAS": 359 return self 360 361 async def __aexit__(self, *exc: Any) -> None: 362 await self.aclose() 363 364 def __del__(self) -> None: 365 """Best-effort close for internally-created clients.""" 366 if not hasattr(self, "_owns_client") or not hasattr(self, "_closed"): 367 return 368 369 if not self._owns_client or self._closed: 370 return 371 372 # Attempt proper cleanup if possible 373 try: 374 loop = asyncio.get_running_loop() 375 except RuntimeError: 376 # No running loop - can't do async cleanup 377 # Just clear the client references synchronously 378 if hasattr(self, "_client_per_loop"): 379 # We can't await client.aclose() without a loop, 380 # so just clear the references 381 self._client_per_loop.clear() 382 self._closed = True 383 return 384 385 # If we get here, we have a running loop 386 try: 387 if loop.is_running(): 388 # Schedule cleanup in the existing loop 389 loop.create_task(self.aclose()) 390 else: 391 # Loop exists but not running - try asyncio.run 392 coro = self.aclose() # Create the coroutine 393 try: 394 asyncio.run(coro) 395 except Exception: 396 # If asyncio.run fails, we need to close the coroutine properly 397 coro.close() # This prevents the RuntimeWarning 398 raise # Re-raise to hit the outer except block 399 except Exception: 400 # If all else fails, just clear references 401 if hasattr(self, "_client_per_loop"): 402 self._client_per_loop.clear() 403 self._closed = True 404 405 # --------------------------------------------------------------------- # 406 # save() – now uses the per-loop client # 407 # --------------------------------------------------------------------- # 408 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID: 409 async with self._sem: 410 files = {"file": data} 411 client = self._loop_client() 412 retry_count = 0 413 414 while retry_count <= self.max_retries: 415 try: 416 response = await client.post( 417 self.rpc_url, files=files, timeout=60.0 418 ) 419 response.raise_for_status() 420 cid_str: str = response.json()["Hash"] 421 cid: CID = CID.decode(cid_str) 422 if cid.codec.code != self.DAG_PB_MARKER: 423 cid = cid.set(codec=codec) 424 return cid 425 426 except (httpx.TimeoutException, httpx.RequestError) as e: 427 retry_count += 1 428 if retry_count > self.max_retries: 429 raise httpx.TimeoutException( 430 f"Failed to save data after {self.max_retries} retries: {str(e)}", 431 request=e.request 432 if isinstance(e, httpx.RequestError) 433 else None, 434 ) 435 436 # Calculate backoff delay 437 delay = self.initial_delay * ( 438 self.backoff_factor ** (retry_count - 1) 439 ) 440 # Add some jitter to prevent thundering herd 441 jitter = delay * 0.1 * (random.random() - 0.5) 442 await asyncio.sleep(delay + jitter) 443 444 except httpx.HTTPStatusError: 445 # Re-raise non-timeout HTTP errors immediately 446 raise 447 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover 448 449 async def load( 450 self, 451 id: IPLDKind, 452 offset: Optional[int] = None, 453 length: Optional[int] = None, 454 suffix: Optional[int] = None, 455 ) -> bytes: 456 """Load data from a CID using the IPFS gateway with optional Range requests.""" 457 cid = cast(CID, id) 458 url: str = f"{self.gateway_base_url + str(cid)}" 459 headers: Dict[str, str] = {} 460 461 # Construct the Range header if required 462 if offset is not None: 463 start = offset 464 if length is not None: 465 # Standard HTTP Range: bytes=start-end (inclusive) 466 end = start + length - 1 467 headers["Range"] = f"bytes={start}-{end}" 468 else: 469 # Standard HTTP Range: bytes=start- (from start to end) 470 headers["Range"] = f"bytes={start}-" 471 elif suffix is not None: 472 # Standard HTTP Range: bytes=-N (last N bytes) 473 headers["Range"] = f"bytes=-{suffix}" 474 475 trace_started_at = instrumentation.begin_cas_load(cid, bool(headers)) 476 response_bytes = 0 477 final_status = "ok" 478 final_retry_count = 0 479 try: 480 async with self._sem: # Throttle gateway 481 client = self._loop_client() 482 retry_count = 0 483 484 while retry_count <= self.max_retries: 485 try: 486 response = await client.get( 487 url, headers=headers or None, timeout=60.0 488 ) 489 response.raise_for_status() 490 content = response.content 491 response_bytes = len(content) 492 final_retry_count = retry_count 493 return content 494 495 except (httpx.TimeoutException, httpx.RequestError) as e: 496 retry_count += 1 497 if retry_count > self.max_retries: 498 final_status = "timeout" 499 final_retry_count = retry_count 500 raise httpx.TimeoutException( 501 f"Failed to load data after {self.max_retries} retries: {str(e)}", 502 request=e.request 503 if isinstance(e, httpx.RequestError) 504 else None, 505 ) 506 507 # Calculate backoff delay with jitter 508 delay = self.initial_delay * ( 509 self.backoff_factor ** (retry_count - 1) 510 ) 511 jitter = delay * 0.1 * (random.random() - 0.5) 512 await asyncio.sleep(delay + jitter) 513 514 except httpx.HTTPStatusError: 515 # Re-raise non-timeout HTTP errors immediately 516 final_status = "http_error" 517 final_retry_count = retry_count 518 raise 519 finally: 520 instrumentation.end_cas_load( 521 trace_started_at, 522 byte_count=response_bytes, 523 retries=final_retry_count, 524 status=final_status, 525 ) 526 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover 527 528 # --------------------------------------------------------------------- # 529 # pin_cid() – method to pin a CID # 530 # --------------------------------------------------------------------- # 531 async def pin_cid( 532 self, 533 cid: CID, 534 target_rpc: str = "http://127.0.0.1:5001", 535 ) -> None: 536 """ 537 Pins a CID to the local Kubo node via the RPC API. 538 539 This call is recursive by default, pinning all linked objects. 540 541 Args: 542 cid (CID): The Content ID to pin. 543 target_rpc (str): The RPC URL of the Kubo node. 544 """ 545 params = {"arg": str(cid), "recursive": "true"} 546 pin_add_url_base: str = f"{target_rpc}/api/v0/pin/add" 547 548 async with self._sem: # throttle RPC 549 client = self._loop_client() 550 response = await client.post(pin_add_url_base, params=params) 551 response.raise_for_status() 552 553 async def unpin_cid( 554 self, cid: CID, target_rpc: str = "http://127.0.0.1:5001" 555 ) -> None: 556 """ 557 Unpins a CID from the local Kubo node via the RPC API. 558 559 Args: 560 cid (CID): The Content ID to unpin. 561 """ 562 params = {"arg": str(cid), "recursive": "true"} 563 unpin_url_base: str = f"{target_rpc}/api/v0/pin/rm" 564 async with self._sem: # throttle RPC 565 client = self._loop_client() 566 response = await client.post(unpin_url_base, params=params) 567 response.raise_for_status() 568 569 async def pin_update( 570 self, 571 old_id: IPLDKind, 572 new_id: IPLDKind, 573 target_rpc: str = "http://127.0.0.1:5001", 574 ) -> None: 575 """ 576 Updates the pinned CID in the storage. 577 578 Args: 579 old_id (IPLDKind): The old Content ID to replace. 580 new_id (IPLDKind): The new Content ID to pin. 581 """ 582 params = {"arg": [str(old_id), str(new_id)]} 583 pin_update_url_base: str = f"{target_rpc}/api/v0/pin/update" 584 async with self._sem: # throttle RPC 585 client = self._loop_client() 586 response = await client.post(pin_update_url_base, params=params) 587 response.raise_for_status() 588 589 async def pin_ls( 590 self, target_rpc: str = "http://127.0.0.1:5001" 591 ) -> list[Dict[str, Any]]: 592 """ 593 Lists all pinned CIDs on the local Kubo node via the RPC API. 594 595 Args: 596 target_rpc (str): The RPC URL of the Kubo node. 597 598 Returns: 599 List[CID]: A list of pinned CIDs. 600 """ 601 pin_ls_url_base: str = f"{target_rpc}/api/v0/pin/ls" 602 async with self._sem: # throttle RPC 603 client = self._loop_client() 604 response = await client.post(pin_ls_url_base) 605 response.raise_for_status() 606 pins = response.json().get("Keys", []) 607 return pins
Connects to an IPFS Kubo daemon.
The IDs in save and load are IPLD CIDs.
- save() → RPC (
/api/v0/add) - load() → HTTP gateway (
/ipfs/{cid})
save uses the RPC API and load uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.
Authentication / custom headers
You have two options:
- Bring your own
httpx.AsyncClientPass it viaclient=...— any default headers or auth configured on that client are reused for every request. - Let
KuboCASbuild the client but passheaders=and/orauth=kwargs; they are forwarded to the internally–createdAsyncClient.
import httpx
from py_hamt import KuboCAS
# Option 1: user-supplied client
client = httpx.AsyncClient(
headers={"Authorization": "Bearer <token>"},
auth=("user", "pass"),
)
cas = KuboCAS(client=client)
# Option 2: let KuboCAS create the client
cas = KuboCAS(
headers={"X-My-Header": "yes"},
auth=("user", "pass"),
)
Parameters
- hasher (str): multihash name (defaults to blake3).
- client (
httpx.AsyncClient | None): reuse an existing client; if None KuboCAS will create one lazily. - headers (dict[str, str] | None): default headers for the internally-created client.
- auth (
tuple[str, str] | None): authentication tuple (username, password) for the internally-created client. - rpc_base_url / gateway_base_url (str | None): override daemon endpoints (defaults match the local daemon ports).
- chunker (str): chunking algorithm specification for Kubo's
addRPC. Accepted formats are"size-<positive int>","rabin", or"rabin-<min>-<avg>-<max>".
...
184 def __init__( 185 self, 186 hasher: str = "blake3", 187 client: httpx.AsyncClient | None = None, 188 rpc_base_url: str | None = None, 189 gateway_base_url: str | None = None, 190 concurrency: int = 32, 191 *, 192 headers: dict[str, str] | None = None, 193 auth: Tuple[str, str] | None = None, 194 pin_on_add: bool = False, 195 chunker: str = "size-1048576", 196 max_retries: int = 3, 197 initial_delay: float = 1.0, 198 backoff_factor: float = 2.0, 199 ): 200 """ 201 If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all. 202 203 ### `httpx.AsyncClient` Management 204 If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()` 205 as a class instance cannot know when it will no longer be in use, unless explicitly told to do so. 206 207 If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below: 208 ```python 209 async with httpx.AsyncClient() as client, KuboCAS( 210 rpc_base_url=rpc_base_url, 211 gateway_base_url=gateway_base_url, 212 client=client, 213 ) as kubo_cas: 214 hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) 215 zhs = ZarrHAMTStore(hamt) 216 # Use the KuboCAS instance as needed 217 # ... 218 ``` 219 As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up. 220 ``` python 221 cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url) 222 # Use the KuboCAS instance as needed 223 # ... 224 await cas.aclose() # Ensure resources are cleaned up 225 ``` 226 227 ### Authenticated RPC/Gateway Access 228 Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in. 229 Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided. 230 If you do not need authentication, you can leave these parameters as `None`. 231 232 ### RPC and HTTP Gateway Base URLs 233 These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided. 234 """ 235 236 self._owns_client: bool = False 237 self._closed: bool = True 238 self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {} 239 self._default_headers = headers 240 self._default_auth = auth 241 242 # Now, perform validation that might raise an exception 243 chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)" 244 if re.fullmatch(chunker_pattern, chunker) is None: 245 raise ValueError("Invalid chunker specification") 246 self.chunker: str = chunker 247 248 self.hasher: str = hasher 249 """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.""" 250 251 if rpc_base_url is None: 252 rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma 253 if gateway_base_url is None: 254 gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL 255 256 if "/ipfs/" in gateway_base_url: 257 gateway_base_url = gateway_base_url.split("/ipfs/")[0] 258 259 # Standard gateway URL construction with proper path handling 260 if gateway_base_url.endswith("/"): 261 gateway_base_url = f"{gateway_base_url}ipfs/" 262 else: 263 gateway_base_url = f"{gateway_base_url}/ipfs/" 264 265 pin_string: str = "true" if pin_on_add else "false" 266 self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin={pin_string}" 267 """@private""" 268 self.gateway_base_url: str = gateway_base_url 269 """@private""" 270 271 if client is not None: 272 # A client was supplied by the user. We don't own it. 273 self._owns_client = False 274 self._client_per_loop = {asyncio.get_running_loop(): client} 275 else: 276 # No client supplied. We will own any clients we create. 277 self._owns_client = True 278 self._client_per_loop = {} 279 280 # store for later use by _loop_client() 281 self._default_headers = headers 282 self._default_auth = auth 283 284 self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency) 285 self._closed = False 286 287 # Validate retry parameters 288 if max_retries < 0: 289 raise ValueError("max_retries must be non-negative") 290 if initial_delay <= 0: 291 raise ValueError("initial_delay must be positive") 292 if backoff_factor < 1.0: 293 raise ValueError("backoff_factor must be >= 1.0 for exponential backoff") 294 295 self.max_retries = max_retries 296 self.initial_delay = initial_delay 297 self.backoff_factor = backoff_factor
If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
httpx.AsyncClient Management
If client is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using await cas.aclose()
as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
If you are using the KuboCAS instance in an async with block, it will automatically close the client when the block is exited which is what we suggest below:
async with httpx.AsyncClient() as client, KuboCAS(
rpc_base_url=rpc_base_url,
gateway_base_url=gateway_base_url,
client=client,
) as kubo_cas:
hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
zhs = ZarrHAMTStore(hamt)
# Use the KuboCAS instance as needed
# ...
As mentioned, if you do not use the async with syntax, you should call await cas.aclose() when you are done using the instance to ensure that all resources are cleaned up.
cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
# Use the KuboCAS instance as needed
# ...
await cas.aclose() # Ensure resources are cleaned up
Authenticated RPC/Gateway Access
Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own httpx.AsyncClient and then passing that in.
Alternatively, they can pass in headers and auth parameters to the constructor, which will be used to create a new httpx.AsyncClient if one is not provided.
If you do not need authentication, you can leave these parameters as None.
RPC and HTTP Gateway Base URLs
These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.
337 async def aclose(self) -> None: 338 """ 339 Closes all internally-created clients. Must be called from an async context. 340 """ 341 if self._owns_client is False: # external client → caller closes 342 return 343 344 # This method is async, so we can reliably await the async close method. 345 # The complex sync/async logic is handled by __del__. 346 for client in list(self._client_per_loop.values()): 347 if not client.is_closed: 348 try: 349 await client.aclose() 350 except Exception: 351 pass # best-effort cleanup 352 353 self._client_per_loop.clear() 354 self._closed = True
Closes all internally-created clients. Must be called from an async context.
408 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID: 409 async with self._sem: 410 files = {"file": data} 411 client = self._loop_client() 412 retry_count = 0 413 414 while retry_count <= self.max_retries: 415 try: 416 response = await client.post( 417 self.rpc_url, files=files, timeout=60.0 418 ) 419 response.raise_for_status() 420 cid_str: str = response.json()["Hash"] 421 cid: CID = CID.decode(cid_str) 422 if cid.codec.code != self.DAG_PB_MARKER: 423 cid = cid.set(codec=codec) 424 return cid 425 426 except (httpx.TimeoutException, httpx.RequestError) as e: 427 retry_count += 1 428 if retry_count > self.max_retries: 429 raise httpx.TimeoutException( 430 f"Failed to save data after {self.max_retries} retries: {str(e)}", 431 request=e.request 432 if isinstance(e, httpx.RequestError) 433 else None, 434 ) 435 436 # Calculate backoff delay 437 delay = self.initial_delay * ( 438 self.backoff_factor ** (retry_count - 1) 439 ) 440 # Add some jitter to prevent thundering herd 441 jitter = delay * 0.1 * (random.random() - 0.5) 442 await asyncio.sleep(delay + jitter) 443 444 except httpx.HTTPStatusError: 445 # Re-raise non-timeout HTTP errors immediately 446 raise 447 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
449 async def load( 450 self, 451 id: IPLDKind, 452 offset: Optional[int] = None, 453 length: Optional[int] = None, 454 suffix: Optional[int] = None, 455 ) -> bytes: 456 """Load data from a CID using the IPFS gateway with optional Range requests.""" 457 cid = cast(CID, id) 458 url: str = f"{self.gateway_base_url + str(cid)}" 459 headers: Dict[str, str] = {} 460 461 # Construct the Range header if required 462 if offset is not None: 463 start = offset 464 if length is not None: 465 # Standard HTTP Range: bytes=start-end (inclusive) 466 end = start + length - 1 467 headers["Range"] = f"bytes={start}-{end}" 468 else: 469 # Standard HTTP Range: bytes=start- (from start to end) 470 headers["Range"] = f"bytes={start}-" 471 elif suffix is not None: 472 # Standard HTTP Range: bytes=-N (last N bytes) 473 headers["Range"] = f"bytes=-{suffix}" 474 475 trace_started_at = instrumentation.begin_cas_load(cid, bool(headers)) 476 response_bytes = 0 477 final_status = "ok" 478 final_retry_count = 0 479 try: 480 async with self._sem: # Throttle gateway 481 client = self._loop_client() 482 retry_count = 0 483 484 while retry_count <= self.max_retries: 485 try: 486 response = await client.get( 487 url, headers=headers or None, timeout=60.0 488 ) 489 response.raise_for_status() 490 content = response.content 491 response_bytes = len(content) 492 final_retry_count = retry_count 493 return content 494 495 except (httpx.TimeoutException, httpx.RequestError) as e: 496 retry_count += 1 497 if retry_count > self.max_retries: 498 final_status = "timeout" 499 final_retry_count = retry_count 500 raise httpx.TimeoutException( 501 f"Failed to load data after {self.max_retries} retries: {str(e)}", 502 request=e.request 503 if isinstance(e, httpx.RequestError) 504 else None, 505 ) 506 507 # Calculate backoff delay with jitter 508 delay = self.initial_delay * ( 509 self.backoff_factor ** (retry_count - 1) 510 ) 511 jitter = delay * 0.1 * (random.random() - 0.5) 512 await asyncio.sleep(delay + jitter) 513 514 except httpx.HTTPStatusError: 515 # Re-raise non-timeout HTTP errors immediately 516 final_status = "http_error" 517 final_retry_count = retry_count 518 raise 519 finally: 520 instrumentation.end_cas_load( 521 trace_started_at, 522 byte_count=response_bytes, 523 retries=final_retry_count, 524 status=final_status, 525 ) 526 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover
Load data from a CID using the IPFS gateway with optional Range requests.
531 async def pin_cid( 532 self, 533 cid: CID, 534 target_rpc: str = "http://127.0.0.1:5001", 535 ) -> None: 536 """ 537 Pins a CID to the local Kubo node via the RPC API. 538 539 This call is recursive by default, pinning all linked objects. 540 541 Args: 542 cid (CID): The Content ID to pin. 543 target_rpc (str): The RPC URL of the Kubo node. 544 """ 545 params = {"arg": str(cid), "recursive": "true"} 546 pin_add_url_base: str = f"{target_rpc}/api/v0/pin/add" 547 548 async with self._sem: # throttle RPC 549 client = self._loop_client() 550 response = await client.post(pin_add_url_base, params=params) 551 response.raise_for_status()
Pins a CID to the local Kubo node via the RPC API.
This call is recursive by default, pinning all linked objects.
Args: cid (CID): The Content ID to pin. target_rpc (str): The RPC URL of the Kubo node.
553 async def unpin_cid( 554 self, cid: CID, target_rpc: str = "http://127.0.0.1:5001" 555 ) -> None: 556 """ 557 Unpins a CID from the local Kubo node via the RPC API. 558 559 Args: 560 cid (CID): The Content ID to unpin. 561 """ 562 params = {"arg": str(cid), "recursive": "true"} 563 unpin_url_base: str = f"{target_rpc}/api/v0/pin/rm" 564 async with self._sem: # throttle RPC 565 client = self._loop_client() 566 response = await client.post(unpin_url_base, params=params) 567 response.raise_for_status()
Unpins a CID from the local Kubo node via the RPC API.
Args: cid (CID): The Content ID to unpin.
569 async def pin_update( 570 self, 571 old_id: IPLDKind, 572 new_id: IPLDKind, 573 target_rpc: str = "http://127.0.0.1:5001", 574 ) -> None: 575 """ 576 Updates the pinned CID in the storage. 577 578 Args: 579 old_id (IPLDKind): The old Content ID to replace. 580 new_id (IPLDKind): The new Content ID to pin. 581 """ 582 params = {"arg": [str(old_id), str(new_id)]} 583 pin_update_url_base: str = f"{target_rpc}/api/v0/pin/update" 584 async with self._sem: # throttle RPC 585 client = self._loop_client() 586 response = await client.post(pin_update_url_base, params=params) 587 response.raise_for_status()
Updates the pinned CID in the storage.
Args: old_id (IPLDKind): The old Content ID to replace. new_id (IPLDKind): The new Content ID to pin.
589 async def pin_ls( 590 self, target_rpc: str = "http://127.0.0.1:5001" 591 ) -> list[Dict[str, Any]]: 592 """ 593 Lists all pinned CIDs on the local Kubo node via the RPC API. 594 595 Args: 596 target_rpc (str): The RPC URL of the Kubo node. 597 598 Returns: 599 List[CID]: A list of pinned CIDs. 600 """ 601 pin_ls_url_base: str = f"{target_rpc}/api/v0/pin/ls" 602 async with self._sem: # throttle RPC 603 client = self._loop_client() 604 response = await client.post(pin_ls_url_base) 605 response.raise_for_status() 606 pins = response.json().get("Keys", []) 607 return pins
Lists all pinned CIDs on the local Kubo node via the RPC API.
Args: target_rpc (str): The RPC URL of the Kubo node.
Returns: List[CID]: A list of pinned CIDs.
16class ZarrHAMTStore(zarr.abc.store.Store): 17 """ 18 Write and read Zarr v3s with a HAMT. 19 20 Read **or** write a Zarr-v3 store whose key/value pairs live inside a 21 py-hamt mapping. 22 23 Keys are stored verbatim (``"temp/c/0/0/0"`` → same string in HAMT) and 24 the value is the raw byte payload produced by Zarr. No additional 25 framing, compression, or encryption is applied by this class. For a zarr encryption example 26 see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb 27 For a fully encrypted zarr store, where metadata is not available, please see 28 :class:`SimpleEncryptedZarrHAMTStore` but we do not recommend using it. 29 30 #### A note about using the same `ZarrHAMTStore` for writing and then reading again 31 If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings. 32 33 #### Sample Code 34 ```python 35 # --- Write --- 36 ds: xarray.Dataset = # ... 37 cas: ContentAddressedStore = # ... 38 hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes 39 hamt = await HAMT.build(cas, values_are_bytes=True) # write-enabled 40 zhs = ZarrHAMTStore(hamt, read_only=False) 41 ds.to_zarr(store=zhs, mode="w", zarr_format=3) 42 await hamt.make_read_only() # flush + freeze 43 root_node_id = hamt.root_node_id 44 print(root_node_id) 45 46 # --- read --- 47 hamt_ro = await HAMT.build( 48 cas, root_node_id=root_cid, read_only=True, values_are_bytes=True 49 ) 50 zhs_ro = ZarrHAMTStore(hamt_ro, read_only=True) 51 ds_ro = xarray.open_zarr(store=zhs_ro) 52 53 54 print(ds_ro) 55 xarray.testing.assert_identical(ds, ds_ro) 56 ``` 57 """ 58 59 _forced_read_only: bool | None = None # sentinel for wrapper clones 60 61 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 62 """ 63 ### `hamt` and `read_only` 64 You need to make sure the following two things are true: 65 66 1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async. 67 2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations. 68 69 ##### A note about the zarr chunk separator, "/" vs "." 70 While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well. 71 72 #### Metadata Read Cache 73 `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data. 74 """ 75 super().__init__(read_only=read_only) 76 77 assert hamt.read_only == read_only 78 assert hamt.values_are_bytes 79 self.hamt: HAMT = hamt 80 """ 81 The internal HAMT. 82 Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID. 83 """ 84 85 self.metadata_read_cache: dict[str, bytes] = {} 86 """@private""" 87 88 def _map_byte_request( 89 self, byte_range: Optional[zarr.abc.store.ByteRequest] 90 ) -> tuple[Optional[int], Optional[int], Optional[int]]: 91 """Helper to map Zarr ByteRequest to offset, length, suffix.""" 92 offset: Optional[int] = None 93 length: Optional[int] = None 94 suffix: Optional[int] = None 95 96 if byte_range: 97 if isinstance(byte_range, zarr.abc.store.RangeByteRequest): 98 offset = byte_range.start 99 length = byte_range.end - byte_range.start 100 if length is not None and length < 0: 101 raise ValueError("End must be >= start for RangeByteRequest") 102 elif isinstance(byte_range, zarr.abc.store.OffsetByteRequest): 103 offset = byte_range.offset 104 elif isinstance(byte_range, zarr.abc.store.SuffixByteRequest): 105 suffix = byte_range.suffix 106 else: 107 raise TypeError(f"Unsupported ByteRequest type: {type(byte_range)}") 108 109 return offset, length, suffix 110 111 @property 112 def read_only(self) -> bool: # type: ignore[override] 113 if self._forced_read_only is not None: # instance attr overrides 114 return self._forced_read_only 115 return self.hamt.read_only 116 117 def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore": 118 """ 119 Return this store (if the flag already matches) or a *shallow* 120 clone that presents the requested read‑only status. 121 122 The clone **shares** the same :class:`~py_hamt.hamt.HAMT` 123 instance; no flushing, network traffic or async work is done. 124 """ 125 # Fast path 126 if read_only == self.read_only: 127 return self # Same mode, return same instance 128 129 # Create new instance with different read_only flag 130 # Creates a *bare* instance without running its __init__ 131 clone = type(self).__new__(type(self)) 132 133 # Copy attributes that matter 134 clone.hamt = self.hamt # Share the HAMT 135 clone._forced_read_only = read_only 136 clone.metadata_read_cache = self.metadata_read_cache.copy() 137 138 # Re‑initialise the zarr base class so that Zarr sees the flag 139 zarr.abc.store.Store.__init__(clone, read_only=read_only) 140 return clone 141 142 def __eq__(self, other: object) -> bool: 143 """@private""" 144 if not isinstance(other, ZarrHAMTStore): 145 return False 146 return self.hamt.root_node_id == other.hamt.root_node_id 147 148 async def get( 149 self, 150 key: str, 151 prototype: zarr.core.buffer.BufferPrototype, 152 byte_range: zarr.abc.store.ByteRequest | None = None, 153 ) -> zarr.core.buffer.Buffer | None: 154 """@private""" 155 with instrumentation.span( 156 "py_hamt.hamt_store.get", 157 { 158 "py_hamt.zarr.key": key, 159 "py_hamt.zarr.byte_range": byte_range is not None, 160 }, 161 ): 162 started_at = time.perf_counter() 163 hit = False 164 is_metadata = len(key) >= 9 and key[-9:] == "zarr.json" 165 try: 166 val: bytes 167 # do len check to avoid indexing into overly short strings, 3.12 does not throw errors but we dont know if other versions will 168 # if path ends with zarr.json 169 170 if ( 171 is_metadata 172 and byte_range is None 173 and key in self.metadata_read_cache 174 ): 175 val = self.metadata_read_cache[key] 176 else: 177 offset, length, suffix = self._map_byte_request(byte_range) 178 val = cast( 179 bytes, 180 await self.hamt.get( 181 key, offset=offset, length=length, suffix=suffix 182 ), 183 ) # We know values received will always be bytes since we only store bytes in the HAMT 184 if is_metadata and byte_range is None: 185 self.metadata_read_cache[key] = val 186 187 hit = True 188 return prototype.buffer.from_bytes(val) 189 except KeyError: 190 # Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases 191 return None 192 except Exception as e: 193 print(f"Error getting key '{key}' with range {byte_range}: {e}") 194 raise 195 finally: 196 instrumentation.record_zarr_get( 197 store="hamt_store", 198 key=key, 199 kind="metadata" if is_metadata else "chunk", 200 hit=hit, 201 seconds=time.perf_counter() - started_at, 202 byte_range=byte_range is not None, 203 ) 204 205 async def get_partial_values( 206 self, 207 prototype: zarr.core.buffer.BufferPrototype, 208 key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], 209 ) -> list[zarr.core.buffer.Buffer | None]: 210 """ 211 Retrieves multiple keys or byte ranges concurrently using asyncio.gather. 212 """ 213 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 214 results = await asyncio.gather( 215 *tasks, return_exceptions=False 216 ) # Set return_exceptions=True for debugging 217 return results 218 219 async def exists(self, key: str) -> bool: 220 """@private""" 221 try: 222 await self.hamt.get(key) 223 return True 224 except KeyError: 225 return False 226 227 @property 228 def supports_writes(self) -> bool: 229 """@private""" 230 return not self.hamt.read_only 231 232 @property 233 def supports_partial_writes(self) -> bool: 234 """@private""" 235 return False 236 237 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 238 """@private""" 239 if self.read_only: 240 raise Exception("Cannot write to a read-only store.") 241 242 if key in self.metadata_read_cache: 243 self.metadata_read_cache[key] = value.to_bytes() 244 await self.hamt.set(key, value.to_bytes()) 245 246 async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None: 247 """@private""" 248 if not (await self.exists(key)): 249 await self.set(key, value) 250 251 async def set_partial_values( 252 self, key_start_values: Iterable[tuple[str, int, BytesLike]] 253 ) -> None: 254 """@private""" 255 raise NotImplementedError 256 257 @property 258 def supports_deletes(self) -> bool: 259 """@private""" 260 return not self.hamt.read_only 261 262 async def delete(self, key: str) -> None: 263 """@private""" 264 if self.read_only: 265 raise Exception("Cannot write to a read-only store.") 266 try: 267 await self.hamt.delete(key) 268 # In practice these lines never seem to be needed, creating and appending data are the only operations most zarrs actually undergo 269 # if key in self.metadata_read_cache: 270 # del self.metadata_read_cache[key] 271 # It's fine if the key was not in the HAMT 272 # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason, probably concurrency issues 273 except KeyError: 274 return 275 276 @property 277 def supports_listing(self) -> bool: 278 """@private""" 279 return True 280 281 async def list(self) -> AsyncIterator[str]: 282 """@private""" 283 async for key in self.hamt.keys(): 284 yield key 285 286 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 287 """@private""" 288 async for key in self.hamt.keys(): 289 if key.startswith(prefix): 290 yield key 291 292 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 293 """ 294 @private 295 List *immediate* children that live directly under **prefix**. 296 297 This is similar to :py:meth:`list_prefix` but collapses everything 298 below the first ``"/"`` after *prefix*. Each child name is yielded 299 **exactly once** in the order of first appearance while scanning the 300 HAMT keys. 301 302 Parameters 303 ---------- 304 prefix : str 305 Logical directory path. *Must* end with ``"/"`` for the result to 306 make sense (e.g. ``"a/b/"``). 307 308 Yields 309 ------ 310 str 311 The name of each direct child (file or sub-directory) of *prefix*. 312 313 Examples 314 -------- 315 With keys :: 316 317 a/b/c/d 318 a/b/c/e 319 a/b/f 320 a/b/g/h/i 321 322 ``await list_dir("a/b/")`` produces :: 323 324 c 325 f 326 g 327 328 Notes 329 ----- 330 • Internally uses a :class:`set` to deduplicate names; memory grows 331 with the number of *unique* children, not the total number of keys. 332 • Order is **not** sorted; it reflects the first encounter while 333 iterating over :py:meth:`HAMT.keys`. 334 """ 335 seen_names: set[str] = set() 336 async for key in self.hamt.keys(): 337 if key.startswith(prefix): 338 suffix: str = key[len(prefix) :] 339 first_slash: int = suffix.find("/") 340 if first_slash == -1: 341 if suffix not in seen_names: 342 seen_names.add(suffix) 343 yield suffix 344 else: 345 name: str = suffix[0:first_slash] 346 if name not in seen_names: 347 seen_names.add(name) 348 yield name
Write and read Zarr v3s with a HAMT.
Read or write a Zarr-v3 store whose key/value pairs live inside a py-hamt mapping.
Keys are stored verbatim ("temp/c/0/0/0" → same string in HAMT) and
the value is the raw byte payload produced by Zarr. No additional
framing, compression, or encryption is applied by this class. For a zarr encryption example
see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
For a fully encrypted zarr store, where metadata is not available, please see
SimpleEncryptedZarrHAMTStore but we do not recommend using it.
A note about using the same ZarrHAMTStore for writing and then reading again
If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.
Sample Code
# --- Write ---
ds: xarray.Dataset = # ...
cas: ContentAddressedStore = # ...
hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
hamt = await HAMT.build(cas, values_are_bytes=True) # write-enabled
zhs = ZarrHAMTStore(hamt, read_only=False)
ds.to_zarr(store=zhs, mode="w", zarr_format=3)
await hamt.make_read_only() # flush + freeze
root_node_id = hamt.root_node_id
print(root_node_id)
# --- read ---
hamt_ro = await HAMT.build(
cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
)
zhs_ro = ZarrHAMTStore(hamt_ro, read_only=True)
ds_ro = xarray.open_zarr(store=zhs_ro)
print(ds_ro)
xarray.testing.assert_identical(ds, ds_ro)
61 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 62 """ 63 ### `hamt` and `read_only` 64 You need to make sure the following two things are true: 65 66 1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async. 67 2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations. 68 69 ##### A note about the zarr chunk separator, "/" vs "." 70 While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well. 71 72 #### Metadata Read Cache 73 `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data. 74 """ 75 super().__init__(read_only=read_only) 76 77 assert hamt.read_only == read_only 78 assert hamt.values_are_bytes 79 self.hamt: HAMT = hamt 80 """ 81 The internal HAMT. 82 Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID. 83 """ 84 85 self.metadata_read_cache: dict[str, bytes] = {} 86 """@private"""
hamt and read_only
You need to make sure the following two things are true:
- The HAMT is in the same read only mode that you are passing into the Zarr store. This means that
hamt.read_only == read_only. This is because making a HAMT read only automatically requires async operations, but__init__cannot be async. - The HAMT has
hamt.values_are_bytes == True. This improves efficiency with Zarr v3 operations.
A note about the zarr chunk separator, "/" vs "."
While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
Metadata Read Cache
ZarrHAMTStore has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
The internal HAMT. Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
111 @property 112 def read_only(self) -> bool: # type: ignore[override] 113 if self._forced_read_only is not None: # instance attr overrides 114 return self._forced_read_only 115 return self.hamt.read_only
Is the store read-only?
117 def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore": 118 """ 119 Return this store (if the flag already matches) or a *shallow* 120 clone that presents the requested read‑only status. 121 122 The clone **shares** the same :class:`~py_hamt.hamt.HAMT` 123 instance; no flushing, network traffic or async work is done. 124 """ 125 # Fast path 126 if read_only == self.read_only: 127 return self # Same mode, return same instance 128 129 # Create new instance with different read_only flag 130 # Creates a *bare* instance without running its __init__ 131 clone = type(self).__new__(type(self)) 132 133 # Copy attributes that matter 134 clone.hamt = self.hamt # Share the HAMT 135 clone._forced_read_only = read_only 136 clone.metadata_read_cache = self.metadata_read_cache.copy() 137 138 # Re‑initialise the zarr base class so that Zarr sees the flag 139 zarr.abc.store.Store.__init__(clone, read_only=read_only) 140 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same ~py_hamt.hamt.HAMT
instance; no flushing, network traffic or async work is done.
205 async def get_partial_values( 206 self, 207 prototype: zarr.core.buffer.BufferPrototype, 208 key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], 209 ) -> list[zarr.core.buffer.Buffer | None]: 210 """ 211 Retrieves multiple keys or byte ranges concurrently using asyncio.gather. 212 """ 213 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 214 results = await asyncio.gather( 215 *tasks, return_exceptions=False 216 ) # Set return_exceptions=True for debugging 217 return results
Retrieves multiple keys or byte ranges concurrently using asyncio.gather.
13class SimpleEncryptedZarrHAMTStore(ZarrHAMTStore): 14 """ 15 Write and read Zarr v3s with a HAMT, encrypting *everything* for maximum privacy. 16 17 This store uses ChaCha20-Poly1305 to encrypt every single key-value pair 18 stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.) 19 and data chunks. This provides strong privacy but means the Zarr store is 20 completely opaque and unusable without the correct encryption key and header. 21 22 Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb 23 24 #### Encryption Details 25 - Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce). 26 - Requires a 32-byte encryption key and a header. 27 - Each encrypted value includes a 24-byte nonce and a 16-byte tag. 28 29 #### Important Considerations 30 - Since metadata is encrypted, standard Zarr tools cannot inspect the 31 dataset without prior decryption using this store class. 32 - There is no support for partial encryption or excluding variables. 33 - There is no metadata caching. 34 35 #### Sample Code 36 ```python 37 import xarray 38 from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar 39 from Crypto.Random import get_random_bytes 40 import numpy as np 41 42 # Setup 43 ds = xarray.Dataset( 44 {"data": (("y", "x"), np.arange(12).reshape(3, 4))}, 45 coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]} 46 ) 47 cas = KuboCAS() # Example ContentAddressedStore 48 encryption_key = get_random_bytes(32) 49 header = b"fully-encrypted-zarr" 50 51 # --- Write --- 52 hamt_write = await HAMT.build(cas=cas, values_are_bytes=True) 53 ezhs_write = SimpleEncryptedZarrHAMTStore( 54 hamt_write, False, encryption_key, header 55 ) 56 print("Writing fully encrypted Zarr...") 57 ds.to_zarr(store=ezhs_write, mode="w") 58 await hamt_write.make_read_only() 59 root_node_id = hamt_write.root_node_id 60 print(f"Wrote Zarr with root: {root_node_id}") 61 62 # --- Read --- 63 hamt_read = await HAMT.build( 64 cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True 65 ) 66 ezhs_read = SimpleEncryptedZarrHAMTStore( 67 hamt_read, True, encryption_key, header 68 ) 69 print("\nReading fully encrypted Zarr...") 70 ds_read = xarray.open_zarr(store=ezhs_read) 71 print("Read back dataset:") 72 print(ds_read) 73 xarray.testing.assert_identical(ds, ds_read) 74 print("Read successful and data verified.") 75 76 # --- Read with wrong key (demonstrates failure) --- 77 wrong_key = get_random_bytes(32) 78 hamt_bad = await HAMT.build( 79 cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True 80 ) 81 ezhs_bad = SimpleEncryptedZarrHAMTStore( 82 hamt_bad, True, wrong_key, header 83 ) 84 print("\nAttempting to read with wrong key...") 85 try: 86 ds_bad = xarray.open_zarr(store=ezhs_bad) 87 print(ds_bad) 88 except Exception as e: 89 print(f"Failed to read as expected: {type(e).__name__} - {e}") 90 ``` 91 """ 92 93 def __init__( 94 self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes 95 ) -> None: 96 """ 97 Initializes the SimpleEncryptedZarrHAMTStore. 98 99 Args: 100 hamt: The HAMT instance for storage. Must have `values_are_bytes=True`. 101 Its `read_only` status must match the `read_only` argument. 102 read_only: If True, the store is in read-only mode. 103 encryption_key: A 32-byte key for ChaCha20-Poly1305. 104 header: A header (bytes) used as associated data in encryption. 105 """ 106 super().__init__(hamt, read_only=read_only) 107 108 if len(encryption_key) != 32: 109 raise ValueError("Encryption key must be exactly 32 bytes long.") 110 self.encryption_key = encryption_key 111 self.header = header 112 self.metadata_read_cache: dict[str, bytes] = {} 113 114 def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore": 115 if read_only == self.read_only: 116 return self 117 118 clone = type(self).__new__(type(self)) 119 clone.hamt = self.hamt 120 clone.encryption_key = self.encryption_key 121 clone.header = self.header 122 clone.metadata_read_cache = self.metadata_read_cache 123 clone._forced_read_only = read_only # safe; attribute is declared 124 zarr.abc.store.Store.__init__(clone, read_only=read_only) 125 return clone 126 127 def _encrypt(self, val: bytes) -> bytes: 128 """Encrypts data using ChaCha20-Poly1305.""" 129 nonce = get_random_bytes(24) # XChaCha20 uses a 24-byte nonce 130 cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce) 131 cipher.update(self.header) 132 ciphertext, tag = cipher.encrypt_and_digest(val) 133 return nonce + tag + ciphertext 134 135 def _decrypt(self, val: bytes) -> bytes: 136 """Decrypts data using ChaCha20-Poly1305.""" 137 try: 138 # Extract nonce (24), tag (16), and ciphertext 139 nonce, tag, ciphertext = val[:24], val[24:40], val[40:] 140 cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce) 141 cipher.update(self.header) 142 plaintext = cipher.decrypt_and_verify(ciphertext, tag) 143 return plaintext 144 except Exception as e: 145 # Catching a broad exception as various issues (key, tag, length) can occur. 146 raise ValueError( 147 "Decryption failed. Check key, header, or data integrity." 148 ) from e 149 150 def __eq__(self, other: object) -> bool: 151 """@private""" 152 if not isinstance(other, SimpleEncryptedZarrHAMTStore): 153 return False 154 return ( 155 self.hamt.root_node_id == other.hamt.root_node_id 156 and self.encryption_key == other.encryption_key 157 and self.header == other.header 158 ) 159 160 async def get( 161 self, 162 key: str, 163 prototype: zarr.core.buffer.BufferPrototype, 164 byte_range: zarr.abc.store.ByteRequest | None = None, 165 ) -> zarr.core.buffer.Buffer | None: 166 """@private""" 167 try: 168 decrypted_val: bytes 169 is_metadata: bool = ( 170 len(key) >= 9 and key[-9:] == "zarr.json" 171 ) # if path ends with zarr.json 172 173 if is_metadata and key in self.metadata_read_cache: 174 decrypted_val = self.metadata_read_cache[key] 175 else: 176 raw_val = cast( 177 bytes, await self.hamt.get(key) 178 ) # We know values received will always be bytes since we only store bytes in the HAMT 179 decrypted_val = self._decrypt(raw_val) 180 if is_metadata: 181 self.metadata_read_cache[key] = decrypted_val 182 return prototype.buffer.from_bytes(decrypted_val) 183 except KeyError: 184 return None 185 186 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 187 """@private""" 188 if self.read_only: 189 raise Exception("Cannot write to a read-only store.") 190 191 raw_bytes = value.to_bytes() 192 if key in self.metadata_read_cache: 193 self.metadata_read_cache[key] = raw_bytes 194 # Encrypt it 195 encrypted_bytes = self._encrypt(raw_bytes) 196 await self.hamt.set(key, encrypted_bytes)
Write and read Zarr v3s with a HAMT, encrypting everything for maximum privacy.
This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
and data chunks. This provides strong privacy but means the Zarr store is
completely opaque and unusable without the correct encryption key and header.
Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
#### Encryption Details
- Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
- Requires a 32-byte encryption key and a header.
- Each encrypted value includes a 24-byte nonce and a 16-byte tag.
#### Important Considerations
- Since metadata is encrypted, standard Zarr tools cannot inspect the
dataset without prior decryption using this store class.
- There is no support for partial encryption or excluding variables.
- There is no metadata caching.
#### Sample Code
import xarray
from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
from Crypto.Random import get_random_bytes
import numpy as np
# Setup
ds = xarray.Dataset(
{"data": (("y", "x"), np.arange(12).reshape(3, 4))},
coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
)
cas = KuboCAS() # Example ContentAddressedStore
encryption_key = get_random_bytes(32)
header = b"fully-encrypted-zarr"
# --- Write ---
hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
ezhs_write = SimpleEncryptedZarrHAMTStore(
hamt_write, False, encryption_key, header
)
print("Writing fully encrypted Zarr...")
ds.to_zarr(store=ezhs_write, mode="w")
await hamt_write.make_read_only()
root_node_id = hamt_write.root_node_id
print(f"Wrote Zarr with root: {root_node_id}")
# --- Read ---
hamt_read = await HAMT.build(
cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
)
ezhs_read = SimpleEncryptedZarrHAMTStore(
hamt_read, True, encryption_key, header
)
print("
Reading fully encrypted Zarr...")
ds_read = xarray.open_zarr(store=ezhs_read)
print("Read back dataset:")
print(ds_read)
xarray.testing.assert_identical(ds, ds_read)
print("Read successful and data verified.")
# --- Read with wrong key (demonstrates failure) ---
wrong_key = get_random_bytes(32)
hamt_bad = await HAMT.build(
cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
)
ezhs_bad = SimpleEncryptedZarrHAMTStore(
hamt_bad, True, wrong_key, header
)
print("
Attempting to read with wrong key...")
try:
ds_bad = xarray.open_zarr(store=ezhs_bad)
print(ds_bad)
except Exception as e:
print(f"Failed to read as expected: {type(e).__name__} - {e}")
93 def __init__( 94 self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes 95 ) -> None: 96 """ 97 Initializes the SimpleEncryptedZarrHAMTStore. 98 99 Args: 100 hamt: The HAMT instance for storage. Must have `values_are_bytes=True`. 101 Its `read_only` status must match the `read_only` argument. 102 read_only: If True, the store is in read-only mode. 103 encryption_key: A 32-byte key for ChaCha20-Poly1305. 104 header: A header (bytes) used as associated data in encryption. 105 """ 106 super().__init__(hamt, read_only=read_only) 107 108 if len(encryption_key) != 32: 109 raise ValueError("Encryption key must be exactly 32 bytes long.") 110 self.encryption_key = encryption_key 111 self.header = header 112 self.metadata_read_cache: dict[str, bytes] = {}
Initializes the SimpleEncryptedZarrHAMTStore.
Args:
hamt: The HAMT instance for storage. Must have values_are_bytes=True.
Its read_only status must match the read_only argument.
read_only: If True, the store is in read-only mode.
encryption_key: A 32-byte key for ChaCha20-Poly1305.
header: A header (bytes) used as associated data in encryption.
114 def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore": 115 if read_only == self.read_only: 116 return self 117 118 clone = type(self).__new__(type(self)) 119 clone.hamt = self.hamt 120 clone.encryption_key = self.encryption_key 121 clone.header = self.header 122 clone.metadata_read_cache = self.metadata_read_cache 123 clone._forced_read_only = read_only # safe; attribute is declared 124 zarr.abc.store.Store.__init__(clone, read_only=read_only) 125 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same ~py_hamt.hamt.HAMT
instance; no flushing, network traffic or async work is done.
145class ShardedZarrStore(zarr.abc.store.Store): 146 """ 147 Implements the Zarr Store API using a sharded layout for chunk CIDs. 148 149 This store divides the flat index of chunk CIDs into multiple "shards". 150 Each shard is a DAG-CBOR array where each element is either a CID link 151 to a chunk or a null value if the chunk is empty. This structure allows 152 for efficient traversal by IPLD-aware systems. 153 154 The store's root object contains: 155 1. A dictionary mapping metadata keys (like 'zarr.json') to their CIDs. 156 2. A list of CIDs, where each CID points to a shard object. 157 3. Sharding configuration details (e.g., chunks_per_shard). 158 """ 159 160 def __init__( 161 self, 162 cas: ContentAddressedStore, 163 read_only: bool, 164 root_cid: Optional[str] = None, 165 *, 166 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 167 ): 168 """Use the async `open()` classmethod to instantiate this class.""" 169 super().__init__(read_only=read_only) 170 self.cas = cas 171 self._root_cid = root_cid 172 self._root_obj: dict 173 174 self._resize_lock = asyncio.Lock() 175 # An event to signal when a resize is in-progress. 176 # It starts in the "set" state, allowing all operations to proceed. 177 self._resize_complete = asyncio.Event() 178 self._resize_complete.set() 179 self._shard_locks: DefaultDict[int, asyncio.Lock] = defaultdict(asyncio.Lock) 180 181 self._shard_data_cache = MemoryBoundedLRUCache(max_cache_memory_bytes) 182 self._pending_shard_loads: Dict[int, asyncio.Event] = {} 183 self._metadata_read_cache: Dict[str, bytes] = {} 184 185 self._array_shape: Tuple[int, ...] 186 self._chunk_shape: Tuple[int, ...] 187 self._chunks_per_dim: Tuple[int, ...] 188 self._chunks_per_shard: int 189 self._num_shards: int = 0 190 self._total_chunks: int = 0 191 192 self._dirty_root = False 193 194 def __update_geometry(self): 195 """Calculates derived geometric properties from the base shapes.""" 196 197 if not all(cs > 0 for cs in self._chunk_shape): 198 raise ValueError("All chunk_shape dimensions must be positive.") 199 if not all(s >= 0 for s in self._array_shape): 200 raise ValueError("All array_shape dimensions must be non-negative.") 201 202 self._chunks_per_dim = tuple( 203 math.ceil(a / c) if c > 0 else 0 204 for a, c in zip(self._array_shape, self._chunk_shape) 205 ) 206 self._total_chunks = math.prod(self._chunks_per_dim) 207 208 if not self._total_chunks == 0: 209 self._num_shards = ( 210 self._total_chunks + self._chunks_per_shard - 1 211 ) // self._chunks_per_shard 212 213 @classmethod 214 async def open( 215 cls, 216 cas: ContentAddressedStore, 217 read_only: bool, 218 root_cid: Optional[str] = None, 219 *, 220 array_shape: Optional[Tuple[int, ...]] = None, 221 chunk_shape: Optional[Tuple[int, ...]] = None, 222 chunks_per_shard: Optional[int] = None, 223 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 224 ) -> "ShardedZarrStore": 225 """ 226 Asynchronously opens an existing ShardedZarrStore or initializes a new one. 227 """ 228 store = cls( 229 cas, read_only, root_cid, max_cache_memory_bytes=max_cache_memory_bytes 230 ) 231 if root_cid: 232 await store._load_root_from_cid() 233 elif not read_only: 234 if array_shape is None or chunk_shape is None: 235 raise ValueError( 236 "array_shape and chunk_shape must be provided for a new store." 237 ) 238 239 if not isinstance(chunks_per_shard, int) or chunks_per_shard <= 0: 240 raise ValueError("chunks_per_shard must be a positive integer.") 241 242 store._initialize_new_root(array_shape, chunk_shape, chunks_per_shard) 243 else: 244 raise ValueError("root_cid must be provided for a read-only store.") 245 return store 246 247 def _initialize_new_root( 248 self, 249 array_shape: Tuple[int, ...], 250 chunk_shape: Tuple[int, ...], 251 chunks_per_shard: int, 252 ): 253 self._array_shape = array_shape 254 self._chunk_shape = chunk_shape 255 self._chunks_per_shard = chunks_per_shard 256 257 self.__update_geometry() 258 259 self._root_obj = { 260 "manifest_version": "sharded_zarr_v1", 261 "metadata": {}, 262 "chunks": { 263 "array_shape": list(self._array_shape), 264 "chunk_shape": list(self._chunk_shape), 265 "sharding_config": { 266 "chunks_per_shard": self._chunks_per_shard, 267 }, 268 "shard_cids": [None] * self._num_shards, 269 }, 270 } 271 self._dirty_root = True 272 273 async def _load_root_from_cid(self): 274 root_bytes = await self.cas.load(self._root_cid) 275 try: 276 self._root_obj = dag_cbor.decode(root_bytes) 277 if not isinstance(self._root_obj, dict) or "chunks" not in self._root_obj: 278 raise ValueError( 279 "Root object is not a valid dictionary with 'chunks' key." 280 ) 281 if not isinstance(self._root_obj["chunks"]["shard_cids"], list): 282 raise ValueError("shard_cids is not a list.") 283 except Exception as e: 284 raise ValueError(f"Failed to decode root object: {e}") 285 286 if self._root_obj.get("manifest_version") != "sharded_zarr_v1": 287 raise ValueError( 288 f"Incompatible manifest version: {self._root_obj.get('manifest_version')}. Expected 'sharded_zarr_v1'." 289 ) 290 291 chunk_info = self._root_obj["chunks"] 292 self._array_shape = tuple(chunk_info["array_shape"]) 293 self._chunk_shape = tuple(chunk_info["chunk_shape"]) 294 self._chunks_per_shard = chunk_info["sharding_config"]["chunks_per_shard"] 295 296 self.__update_geometry() 297 298 if len(chunk_info["shard_cids"]) != self._num_shards: 299 raise ValueError( 300 f"Inconsistent number of shards. Expected {self._num_shards}, found {len(chunk_info['shard_cids'])}." 301 ) 302 303 async def _fetch_and_cache_full_shard( 304 self, 305 shard_idx: int, 306 shard_cid: str, 307 max_retries: int = 3, 308 retry_delay: float = 1.0, 309 ) -> None: 310 """ 311 Fetch a shard from CAS and cache it, with retry logic for transient errors. 312 313 Args: 314 shard_idx: The index of the shard to fetch. 315 shard_cid: The CID of the shard. 316 max_retries: Maximum number of retry attempts for transient errors. 317 retry_delay: Delay between retry attempts in seconds. 318 """ 319 for attempt in range(max_retries): 320 try: 321 shard_data_bytes = await self.cas.load(shard_cid) 322 decoded_shard = dag_cbor.decode(shard_data_bytes) 323 if not isinstance(decoded_shard, list): 324 raise TypeError(f"Shard {shard_idx} did not decode to a list.") 325 await self._shard_data_cache.put(shard_idx, decoded_shard) 326 # Always set the Event to unblock waiting coroutines 327 if shard_idx in self._pending_shard_loads: 328 self._pending_shard_loads[shard_idx].set() 329 del self._pending_shard_loads[shard_idx] 330 return # Success 331 except (ConnectionError, TimeoutError) as e: 332 # Handle transient errors (e.g., network issues) 333 if attempt < max_retries - 1: 334 await asyncio.sleep( 335 retry_delay * (2**attempt) 336 ) # Exponential backoff 337 continue 338 else: 339 raise RuntimeError( 340 f"Failed to fetch shard {shard_idx} after {max_retries} attempts: {e}" 341 ) 342 343 def _parse_chunk_key(self, key: str) -> Optional[Tuple[int, ...]]: 344 # 1. Exclude .json files immediately (metadata) 345 if key.endswith(".json"): 346 return None 347 excluded_array_prefixes = { 348 "time", 349 "lat", 350 "lon", 351 "latitude", 352 "longitude", 353 "forecast_reference_time", 354 "step", 355 } 356 357 chunk_marker = "/c/" 358 marker_idx = key.rfind(chunk_marker) # Use rfind for robustness 359 if marker_idx == -1: 360 # Key does not contain "/c/", so it's not a chunk data key 361 # in the expected format (e.g., could be .zattrs, .zgroup at various levels). 362 return None 363 364 # Extract the part of the key before "/c/", which might represent the array/group path 365 # e.g., "temp" from "temp/c/0/0/0" 366 # e.g., "group1/lat" from "group1/lat/c/0" 367 # e.g., "" if key is "c/0/0/0" (root array) 368 path_before_c = key[:marker_idx] 369 370 # Determine the actual array name (the last component of the path before "/c/") 371 actual_array_name = "" 372 if path_before_c: 373 actual_array_name = path_before_c.split("/")[-1] 374 375 # If the determined array name is in our exclusion list, return None. 376 if actual_array_name in excluded_array_prefixes: 377 return None 378 379 # The part after "/c/" contains the chunk coordinates 380 coord_part = key[marker_idx + len(chunk_marker) :] 381 parts = coord_part.split("/") 382 383 coords = tuple(map(int, parts)) 384 # Validate coordinates against the chunk grid of the store's configured array 385 for i, c_coord in enumerate(coords): 386 if not (0 <= c_coord < self._chunks_per_dim[i]): 387 raise IndexError( 388 f"Chunk coordinate {c_coord} at dimension {i} is out of bounds for dimension size {self._chunks_per_dim[i]}." 389 ) 390 return coords 391 392 def _get_linear_chunk_index(self, chunk_coords: Tuple[int, ...]) -> int: 393 linear_index = 0 394 multiplier = 1 395 # Convert N-D chunk coordinates to a flat 1-D index (row-major order) 396 for i in reversed(range(len(self._chunks_per_dim))): 397 linear_index += chunk_coords[i] * multiplier 398 multiplier *= self._chunks_per_dim[i] 399 return linear_index 400 401 def _get_shard_info(self, linear_chunk_index: int) -> Tuple[int, int]: 402 shard_idx = linear_chunk_index // self._chunks_per_shard 403 index_in_shard = linear_chunk_index % self._chunks_per_shard 404 return shard_idx, index_in_shard 405 406 async def _load_or_initialize_shard_cache( 407 self, shard_idx: int 408 ) -> List[Optional[CID]]: 409 """ 410 Load a shard into the cache or initialize an empty shard if it doesn't exist. 411 412 Args: 413 shard_idx: The index of the shard to load or initialize. 414 415 Returns: 416 List[Optional[CID]]: The shard data (list of CIDs or None). 417 418 Raises: 419 ValueError: If the shard index is out of bounds. 420 RuntimeError: If the shard cannot be loaded or initialized. 421 """ 422 started_at = time.perf_counter() 423 cached_shard = await self._shard_data_cache.get(shard_idx) 424 if cached_shard is not None: 425 instrumentation.record_shard_load( 426 shard_idx=shard_idx, 427 cache_hit=True, 428 seconds=time.perf_counter() - started_at, 429 entries=len(cached_shard), 430 ) 431 return cached_shard 432 433 if shard_idx in self._pending_shard_loads: 434 try: 435 # Wait for the pending load with a timeout (e.g., 60 seconds) 436 await asyncio.wait_for( 437 self._pending_shard_loads[shard_idx].wait(), timeout=60.0 438 ) 439 cached_shard = await self._shard_data_cache.get(shard_idx) 440 if cached_shard is not None: 441 return cached_shard 442 else: 443 raise RuntimeError( 444 f"Shard {shard_idx} not found in cache after pending load completed." 445 ) 446 except asyncio.TimeoutError: 447 # Clean up the pending load to allow retry 448 if shard_idx in self._pending_shard_loads: 449 self._pending_shard_loads[shard_idx].set() 450 del self._pending_shard_loads[shard_idx] 451 raise RuntimeError(f"Timeout waiting for shard {shard_idx} to load.") 452 453 if not (0 <= shard_idx < self._num_shards): 454 raise ValueError(f"Shard index {shard_idx} out of bounds.") 455 456 shard_cid_obj = self._root_obj["chunks"]["shard_cids"][shard_idx] 457 if shard_cid_obj: 458 self._pending_shard_loads[shard_idx] = asyncio.Event() 459 shard_cid_str = str(shard_cid_obj) 460 await self._fetch_and_cache_full_shard(shard_idx, shard_cid_str) 461 else: 462 empty_shard = [None] * self._chunks_per_shard 463 await self._shard_data_cache.put(shard_idx, empty_shard) 464 465 result = await self._shard_data_cache.get(shard_idx) 466 if result is None: 467 raise RuntimeError(f"Failed to load or initialize shard {shard_idx}") 468 instrumentation.record_shard_load( 469 shard_idx=shard_idx, 470 cache_hit=False, 471 seconds=time.perf_counter() - started_at, 472 entries=len(result), 473 ) 474 return result # type: ignore[return-value] 475 476 async def set_partial_values( 477 self, key_start_values: Iterable[Tuple[str, int, BytesLike]] 478 ) -> None: 479 raise NotImplementedError( 480 "Partial writes are not supported by ShardedZarrStore." 481 ) 482 483 async def get_partial_values( 484 self, 485 prototype: zarr.core.buffer.BufferPrototype, 486 key_ranges: Iterable[Tuple[str, zarr.abc.store.ByteRequest | None]], 487 ) -> List[Optional[zarr.core.buffer.Buffer]]: 488 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 489 results = await asyncio.gather(*tasks) 490 return results 491 492 def with_read_only(self, read_only: bool = False) -> "ShardedZarrStore": 493 """ 494 Return this store (if the flag already matches) or a *shallow* 495 clone that presents the requested read‑only status. 496 497 The clone **shares** the same CAS instance and internal state; 498 no flushing, network traffic or async work is done. 499 """ 500 # Fast path 501 if read_only == self.read_only: 502 return self # Same mode, return same instance 503 504 # Create new instance with different read_only flag 505 # Creates a *bare* instance without running its __init__ 506 clone = type(self).__new__(type(self)) 507 508 # Copy all attributes from the current instance 509 clone.cas = self.cas 510 clone._root_cid = self._root_cid 511 clone._root_obj = self._root_obj 512 513 clone._resize_lock = self._resize_lock 514 clone._resize_complete = self._resize_complete 515 clone._shard_locks = self._shard_locks 516 517 clone._shard_data_cache = self._shard_data_cache 518 clone._pending_shard_loads = self._pending_shard_loads 519 clone._metadata_read_cache = self._metadata_read_cache 520 521 clone._array_shape = self._array_shape 522 clone._chunk_shape = self._chunk_shape 523 clone._chunks_per_dim = self._chunks_per_dim 524 clone._chunks_per_shard = self._chunks_per_shard 525 clone._num_shards = self._num_shards 526 clone._total_chunks = self._total_chunks 527 528 clone._dirty_root = self._dirty_root 529 530 # Re‑initialise the zarr base class so that Zarr sees the flag 531 zarr.abc.store.Store.__init__(clone, read_only=read_only) 532 return clone 533 534 def __eq__(self, other: object) -> bool: 535 if not isinstance(other, ShardedZarrStore): 536 return False 537 # For equality, root CID is primary. Config like chunks_per_shard is part of that root's identity. 538 return self._root_cid == other._root_cid 539 540 # If nothing to flush, return the root CID. 541 async def flush(self) -> str: 542 async with self._shard_data_cache._cache_lock: 543 dirty_shards = list(self._shard_data_cache._dirty_shards) 544 if dirty_shards: 545 for shard_idx in sorted(dirty_shards): 546 # Get the list of CIDs/Nones from the cache 547 shard_data_list = await self._shard_data_cache.get(shard_idx) 548 if shard_data_list is None: 549 raise RuntimeError(f"Dirty shard {shard_idx} not found in cache") 550 551 # Encode this list into a DAG-CBOR byte representation 552 shard_data_bytes = dag_cbor.encode(shard_data_list) 553 554 # Save the DAG-CBOR block and get its CID 555 new_shard_cid_obj = await self.cas.save( 556 shard_data_bytes, 557 codec="dag-cbor", # Use 'dag-cbor' codec 558 ) 559 560 if ( 561 self._root_obj["chunks"]["shard_cids"][shard_idx] 562 != new_shard_cid_obj 563 ): 564 # Store the CID object directly 565 self._root_obj["chunks"]["shard_cids"][shard_idx] = ( 566 new_shard_cid_obj 567 ) 568 self._dirty_root = True 569 # Mark shard as clean after flushing 570 await self._shard_data_cache.mark_clean(shard_idx) 571 572 if self._dirty_root: 573 # Ensure all metadata CIDs are CID objects for correct encoding 574 self._root_obj["metadata"] = { 575 k: (CID.decode(v) if isinstance(v, str) else v) 576 for k, v in self._root_obj["metadata"].items() 577 } 578 root_obj_bytes = dag_cbor.encode(self._root_obj) 579 new_root_cid = await self.cas.save(root_obj_bytes, codec="dag-cbor") 580 self._root_cid = str(new_root_cid) 581 self._dirty_root = False 582 583 # Ignore because root_cid will always exist after initialization or flush. 584 return self._root_cid # type: ignore[return-value] 585 586 async def get( 587 self, 588 key: str, 589 prototype: zarr.core.buffer.BufferPrototype, 590 byte_range: Optional[zarr.abc.store.ByteRequest] = None, 591 ) -> Optional[zarr.core.buffer.Buffer]: 592 with instrumentation.span( 593 "py_hamt.sharded_store.get", 594 { 595 "py_hamt.zarr.key": key, 596 "py_hamt.zarr.byte_range": byte_range is not None, 597 }, 598 ): 599 started_at = time.perf_counter() 600 hit = False 601 kind = "metadata" 602 shard_idx_for_trace: int | None = None 603 chunk_coords = self._parse_chunk_key(key) 604 try: 605 # Metadata request 606 if chunk_coords is None: 607 metadata_cid_obj = self._root_obj["metadata"].get(key) 608 if metadata_cid_obj is None: 609 return None 610 if byte_range is not None: 611 raise ValueError( 612 "Byte range requests are not supported for metadata keys." 613 ) 614 data = self._metadata_read_cache.get(key) 615 if data is None: 616 data = await self.cas.load(str(metadata_cid_obj)) 617 self._metadata_read_cache[key] = data 618 hit = True 619 return prototype.buffer.from_bytes(data) 620 # Chunk data request 621 kind = "chunk" 622 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 623 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 624 shard_idx_for_trace = shard_idx 625 626 # This will load the full shard into cache if it's not already there. 627 shard_lock = self._shard_locks[shard_idx] 628 async with shard_lock: 629 target_shard_list = await self._load_or_initialize_shard_cache( 630 shard_idx 631 ) 632 633 # Get the CID object (or None) from the cached list. 634 chunk_cid_obj = target_shard_list[index_in_shard] 635 636 if chunk_cid_obj is None: 637 return None # Chunk is empty/doesn't exist. 638 639 chunk_cid_str = str(chunk_cid_obj) 640 641 req_offset = None 642 req_length = None 643 req_suffix = None 644 645 if byte_range: 646 if isinstance(byte_range, RangeByteRequest): 647 req_offset = byte_range.start 648 if byte_range.end is not None: 649 if byte_range.start > byte_range.end: 650 raise ValueError( 651 f"Byte range start ({byte_range.start}) cannot be greater than end ({byte_range.end})" 652 ) 653 req_length = byte_range.end - byte_range.start 654 elif isinstance(byte_range, OffsetByteRequest): 655 req_offset = byte_range.offset 656 elif isinstance(byte_range, SuffixByteRequest): 657 req_suffix = byte_range.suffix 658 data = await self.cas.load( 659 chunk_cid_str, 660 offset=req_offset, 661 length=req_length, 662 suffix=req_suffix, 663 ) 664 hit = True 665 return prototype.buffer.from_bytes(data) 666 finally: 667 instrumentation.record_zarr_get( 668 store="sharded_store", 669 key=key, 670 kind=kind, 671 hit=hit, 672 seconds=time.perf_counter() - started_at, 673 byte_range=byte_range is not None, 674 shard_idx=shard_idx_for_trace, 675 ) 676 677 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 678 if self.read_only: 679 raise PermissionError("Cannot write to a read-only store.") 680 await self._resize_complete.wait() 681 682 if key.endswith("zarr.json") and not key == "zarr.json": 683 metadata_json = json.loads(value.to_bytes().decode("utf-8")) 684 new_array_shape = metadata_json.get("shape") 685 # Some metadata entries (e.g., group metadata) do not have a shape field. 686 if new_array_shape: 687 # Only resize when the metadata shape represents the primary array. 688 if ( 689 len(new_array_shape) == len(self._array_shape) 690 and tuple(new_array_shape) != self._array_shape 691 ): 692 async with self._resize_lock: 693 # Double-check after acquiring the lock, in case another task 694 # just finished this exact resize while we were waiting. 695 if ( 696 len(new_array_shape) == len(self._array_shape) 697 and tuple(new_array_shape) != self._array_shape 698 ): 699 # Block all other tasks until resize is complete. 700 self._resize_complete.clear() 701 try: 702 await self.resize_store( 703 new_shape=tuple(new_array_shape) 704 ) 705 finally: 706 # All waiting tasks will now un-pause and proceed safely. 707 self._resize_complete.set() 708 709 raw_data_bytes = value.to_bytes() 710 # Save the data to CAS first to get its CID. 711 # Metadata is often saved as 'raw', chunks as well unless compressed. 712 try: 713 data_cid_obj = await self.cas.save(raw_data_bytes, codec="raw") 714 await self.set_pointer(key, str(data_cid_obj)) 715 if self._parse_chunk_key(key) is None: 716 self._metadata_read_cache[key] = raw_data_bytes 717 except Exception as e: 718 raise RuntimeError(f"Failed to save data for key {key}: {e}") 719 return None # type: ignore[return-value] 720 721 async def set_pointer(self, key: str, pointer: str) -> None: 722 chunk_coords = self._parse_chunk_key(key) 723 724 pointer_cid_obj = CID.decode(pointer) # Convert string to CID object 725 726 if chunk_coords is None: # Metadata key 727 self._root_obj["metadata"][key] = pointer_cid_obj 728 self._dirty_root = True 729 return None 730 731 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 732 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 733 734 shard_lock = self._shard_locks[shard_idx] 735 async with shard_lock: 736 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 737 738 if target_shard_list[index_in_shard] != pointer_cid_obj: 739 target_shard_list[index_in_shard] = pointer_cid_obj 740 await self._shard_data_cache.mark_dirty(shard_idx) 741 return None 742 743 async def exists(self, key: str) -> bool: 744 try: 745 chunk_coords = self._parse_chunk_key(key) 746 if chunk_coords is None: # Metadata 747 return key in self._root_obj.get("metadata", {}) 748 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 749 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 750 # Load shard if not cached and check the index 751 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 752 return target_shard_list[index_in_shard] is not None 753 except (ValueError, IndexError, KeyError): 754 return False 755 756 @property 757 def supports_writes(self) -> bool: 758 return not self.read_only 759 760 @property 761 def supports_partial_writes(self) -> bool: 762 return False # Each chunk CID is written atomically into a shard slot 763 764 @property 765 def supports_deletes(self) -> bool: 766 return not self.read_only 767 768 async def delete(self, key: str) -> None: 769 if self.read_only: 770 raise PermissionError("Cannot delete from a read-only store.") 771 772 chunk_coords = self._parse_chunk_key(key) 773 if chunk_coords is None: # Metadata 774 # Coordinate/metadata deletions should be idempotent for caller convenience. 775 if self._root_obj["metadata"].pop(key, None) is not None: 776 self._metadata_read_cache.pop(key, None) 777 self._dirty_root = True 778 return None 779 780 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 781 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 782 783 shard_lock = self._shard_locks[shard_idx] 784 async with shard_lock: 785 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 786 if target_shard_list[index_in_shard] is not None: 787 target_shard_list[index_in_shard] = None 788 await self._shard_data_cache.mark_dirty(shard_idx) 789 790 @property 791 def supports_listing(self) -> bool: 792 return True 793 794 async def list(self) -> AsyncIterator[str]: 795 for key in list(self._root_obj.get("metadata", {})): 796 yield key 797 798 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 799 async for key in self.list(): 800 if key.startswith(prefix): 801 yield key 802 803 async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]): 804 if self.read_only: 805 raise PermissionError("Cannot graft onto a read-only store.") 806 807 store_to_graft = await ShardedZarrStore.open( 808 cas=self.cas, read_only=True, root_cid=store_to_graft_cid 809 ) 810 source_chunk_grid = store_to_graft._chunks_per_dim 811 for local_coords in itertools.product(*[range(s) for s in source_chunk_grid]): 812 linear_local_index = store_to_graft._get_linear_chunk_index(local_coords) 813 local_shard_idx, index_in_local_shard = store_to_graft._get_shard_info( 814 linear_local_index 815 ) 816 # Load the source shard into its cache 817 source_shard_list = await store_to_graft._load_or_initialize_shard_cache( 818 local_shard_idx 819 ) 820 821 pointer_cid_obj = source_shard_list[index_in_local_shard] 822 if pointer_cid_obj is None: 823 continue 824 825 # Calculate global coordinates and write to the main store's index 826 global_coords = tuple( 827 c_local + c_offset 828 for c_local, c_offset in zip(local_coords, chunk_offset) 829 ) 830 linear_global_index = self._get_linear_chunk_index(global_coords) 831 global_shard_idx, index_in_global_shard = self._get_shard_info( 832 linear_global_index 833 ) 834 835 shard_lock = self._shard_locks[global_shard_idx] 836 async with shard_lock: 837 target_shard_list = await self._load_or_initialize_shard_cache( 838 global_shard_idx 839 ) 840 if target_shard_list[index_in_global_shard] != pointer_cid_obj: 841 target_shard_list[index_in_global_shard] = pointer_cid_obj 842 await self._shard_data_cache.mark_dirty(global_shard_idx) 843 844 async def resize_store(self, new_shape: Tuple[int, ...]): 845 """ 846 Resizes the store's main shard index to accommodate a new overall array shape. 847 This is a metadata-only operation on the store's root object. 848 Used when doing skeleton writes or appends via xarray where the array shape changes. 849 """ 850 if self.read_only: 851 raise PermissionError("Cannot resize a read-only store.") 852 if ( 853 # self._root_obj is None 854 self._chunk_shape is None 855 or self._chunks_per_shard is None 856 or self._array_shape is None 857 ): 858 raise RuntimeError("Store is not properly initialized for resizing.") 859 if len(new_shape) != len(self._array_shape): 860 raise ValueError( 861 "New shape must have the same number of dimensions as the old shape." 862 ) 863 864 self._array_shape = tuple(new_shape) 865 self._chunks_per_dim = tuple( 866 math.ceil(a / c) if c > 0 else 0 867 for a, c in zip(self._array_shape, self._chunk_shape) 868 ) 869 self._total_chunks = math.prod(self._chunks_per_dim) 870 old_num_shards = self._num_shards if self._num_shards is not None else 0 871 self._num_shards = ( 872 (self._total_chunks + self._chunks_per_shard - 1) // self._chunks_per_shard 873 if self._total_chunks > 0 874 else 0 875 ) 876 self._root_obj["chunks"]["array_shape"] = list(self._array_shape) 877 if self._num_shards > old_num_shards: 878 self._root_obj["chunks"]["shard_cids"].extend( 879 [None] * (self._num_shards - old_num_shards) 880 ) 881 elif self._num_shards < old_num_shards: 882 self._root_obj["chunks"]["shard_cids"] = self._root_obj["chunks"][ 883 "shard_cids" 884 ][: self._num_shards] 885 886 self._dirty_root = True 887 888 async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]): 889 """ 890 Resizes the Zarr metadata for a specific variable (e.g., '.json' file). 891 This does NOT change the store's main shard index. 892 """ 893 if self.read_only: 894 raise PermissionError("Cannot resize a read-only store.") 895 896 zarr_metadata_key = f"{variable_name}/zarr.json" 897 898 old_zarr_metadata_cid = self._root_obj["metadata"].get(zarr_metadata_key) 899 if not old_zarr_metadata_cid: 900 raise KeyError( 901 f"Cannot find metadata for key '{zarr_metadata_key}' to resize." 902 ) 903 904 old_zarr_metadata_bytes = await self.cas.load(old_zarr_metadata_cid) 905 zarr_metadata_json = json.loads(old_zarr_metadata_bytes) 906 907 zarr_metadata_json["shape"] = list(new_shape) 908 909 new_zarr_metadata_bytes = json.dumps(zarr_metadata_json, indent=2).encode( 910 "utf-8" 911 ) 912 # Metadata is a raw blob of bytes 913 new_zarr_metadata_cid = await self.cas.save( 914 new_zarr_metadata_bytes, codec="raw" 915 ) 916 917 self._root_obj["metadata"][zarr_metadata_key] = new_zarr_metadata_cid 918 self._metadata_read_cache[zarr_metadata_key] = new_zarr_metadata_bytes 919 self._dirty_root = True 920 921 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 922 seen: Set[str] = set() 923 if prefix == "": 924 async for key in self.list(): # Iterates metadata keys 925 # e.g., if key is "group1/.zgroup" or "array1/.json", first_component is "group1" or "array1" 926 # if key is ".zgroup", first_component is ".zgroup" 927 first_component = key.split("/", 1)[0] 928 if first_component not in seen: 929 seen.add(first_component) 930 yield first_component 931 else: 932 raise NotImplementedError("Listing with a prefix is not implemented yet.")
Implements the Zarr Store API using a sharded layout for chunk CIDs.
This store divides the flat index of chunk CIDs into multiple "shards". Each shard is a DAG-CBOR array where each element is either a CID link to a chunk or a null value if the chunk is empty. This structure allows for efficient traversal by IPLD-aware systems.
The store's root object contains:
- A dictionary mapping metadata keys (like 'zarr.json') to their CIDs.
- A list of CIDs, where each CID points to a shard object.
- Sharding configuration details (e.g., chunks_per_shard).
160 def __init__( 161 self, 162 cas: ContentAddressedStore, 163 read_only: bool, 164 root_cid: Optional[str] = None, 165 *, 166 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 167 ): 168 """Use the async `open()` classmethod to instantiate this class.""" 169 super().__init__(read_only=read_only) 170 self.cas = cas 171 self._root_cid = root_cid 172 self._root_obj: dict 173 174 self._resize_lock = asyncio.Lock() 175 # An event to signal when a resize is in-progress. 176 # It starts in the "set" state, allowing all operations to proceed. 177 self._resize_complete = asyncio.Event() 178 self._resize_complete.set() 179 self._shard_locks: DefaultDict[int, asyncio.Lock] = defaultdict(asyncio.Lock) 180 181 self._shard_data_cache = MemoryBoundedLRUCache(max_cache_memory_bytes) 182 self._pending_shard_loads: Dict[int, asyncio.Event] = {} 183 self._metadata_read_cache: Dict[str, bytes] = {} 184 185 self._array_shape: Tuple[int, ...] 186 self._chunk_shape: Tuple[int, ...] 187 self._chunks_per_dim: Tuple[int, ...] 188 self._chunks_per_shard: int 189 self._num_shards: int = 0 190 self._total_chunks: int = 0 191 192 self._dirty_root = False
Use the async open() classmethod to instantiate this class.
213 @classmethod 214 async def open( 215 cls, 216 cas: ContentAddressedStore, 217 read_only: bool, 218 root_cid: Optional[str] = None, 219 *, 220 array_shape: Optional[Tuple[int, ...]] = None, 221 chunk_shape: Optional[Tuple[int, ...]] = None, 222 chunks_per_shard: Optional[int] = None, 223 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 224 ) -> "ShardedZarrStore": 225 """ 226 Asynchronously opens an existing ShardedZarrStore or initializes a new one. 227 """ 228 store = cls( 229 cas, read_only, root_cid, max_cache_memory_bytes=max_cache_memory_bytes 230 ) 231 if root_cid: 232 await store._load_root_from_cid() 233 elif not read_only: 234 if array_shape is None or chunk_shape is None: 235 raise ValueError( 236 "array_shape and chunk_shape must be provided for a new store." 237 ) 238 239 if not isinstance(chunks_per_shard, int) or chunks_per_shard <= 0: 240 raise ValueError("chunks_per_shard must be a positive integer.") 241 242 store._initialize_new_root(array_shape, chunk_shape, chunks_per_shard) 243 else: 244 raise ValueError("root_cid must be provided for a read-only store.") 245 return store
Asynchronously opens an existing ShardedZarrStore or initializes a new one.
476 async def set_partial_values( 477 self, key_start_values: Iterable[Tuple[str, int, BytesLike]] 478 ) -> None: 479 raise NotImplementedError( 480 "Partial writes are not supported by ShardedZarrStore." 481 )
Store values at a given key, starting at byte range_start.
Parameters
key_start_values : list[tuple[str, int, BytesLike]] set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key
483 async def get_partial_values( 484 self, 485 prototype: zarr.core.buffer.BufferPrototype, 486 key_ranges: Iterable[Tuple[str, zarr.abc.store.ByteRequest | None]], 487 ) -> List[Optional[zarr.core.buffer.Buffer]]: 488 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 489 results = await asyncio.gather(*tasks) 490 return results
Retrieve possibly partial values from given key_ranges.
Parameters
prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]] Ordered set of key, range pairs, a key may occur multiple times with different ranges
Returns
list of values, in the order of the key_ranges, may contain null/none for missing keys
492 def with_read_only(self, read_only: bool = False) -> "ShardedZarrStore": 493 """ 494 Return this store (if the flag already matches) or a *shallow* 495 clone that presents the requested read‑only status. 496 497 The clone **shares** the same CAS instance and internal state; 498 no flushing, network traffic or async work is done. 499 """ 500 # Fast path 501 if read_only == self.read_only: 502 return self # Same mode, return same instance 503 504 # Create new instance with different read_only flag 505 # Creates a *bare* instance without running its __init__ 506 clone = type(self).__new__(type(self)) 507 508 # Copy all attributes from the current instance 509 clone.cas = self.cas 510 clone._root_cid = self._root_cid 511 clone._root_obj = self._root_obj 512 513 clone._resize_lock = self._resize_lock 514 clone._resize_complete = self._resize_complete 515 clone._shard_locks = self._shard_locks 516 517 clone._shard_data_cache = self._shard_data_cache 518 clone._pending_shard_loads = self._pending_shard_loads 519 clone._metadata_read_cache = self._metadata_read_cache 520 521 clone._array_shape = self._array_shape 522 clone._chunk_shape = self._chunk_shape 523 clone._chunks_per_dim = self._chunks_per_dim 524 clone._chunks_per_shard = self._chunks_per_shard 525 clone._num_shards = self._num_shards 526 clone._total_chunks = self._total_chunks 527 528 clone._dirty_root = self._dirty_root 529 530 # Re‑initialise the zarr base class so that Zarr sees the flag 531 zarr.abc.store.Store.__init__(clone, read_only=read_only) 532 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same CAS instance and internal state; no flushing, network traffic or async work is done.
541 async def flush(self) -> str: 542 async with self._shard_data_cache._cache_lock: 543 dirty_shards = list(self._shard_data_cache._dirty_shards) 544 if dirty_shards: 545 for shard_idx in sorted(dirty_shards): 546 # Get the list of CIDs/Nones from the cache 547 shard_data_list = await self._shard_data_cache.get(shard_idx) 548 if shard_data_list is None: 549 raise RuntimeError(f"Dirty shard {shard_idx} not found in cache") 550 551 # Encode this list into a DAG-CBOR byte representation 552 shard_data_bytes = dag_cbor.encode(shard_data_list) 553 554 # Save the DAG-CBOR block and get its CID 555 new_shard_cid_obj = await self.cas.save( 556 shard_data_bytes, 557 codec="dag-cbor", # Use 'dag-cbor' codec 558 ) 559 560 if ( 561 self._root_obj["chunks"]["shard_cids"][shard_idx] 562 != new_shard_cid_obj 563 ): 564 # Store the CID object directly 565 self._root_obj["chunks"]["shard_cids"][shard_idx] = ( 566 new_shard_cid_obj 567 ) 568 self._dirty_root = True 569 # Mark shard as clean after flushing 570 await self._shard_data_cache.mark_clean(shard_idx) 571 572 if self._dirty_root: 573 # Ensure all metadata CIDs are CID objects for correct encoding 574 self._root_obj["metadata"] = { 575 k: (CID.decode(v) if isinstance(v, str) else v) 576 for k, v in self._root_obj["metadata"].items() 577 } 578 root_obj_bytes = dag_cbor.encode(self._root_obj) 579 new_root_cid = await self.cas.save(root_obj_bytes, codec="dag-cbor") 580 self._root_cid = str(new_root_cid) 581 self._dirty_root = False 582 583 # Ignore because root_cid will always exist after initialization or flush. 584 return self._root_cid # type: ignore[return-value]
586 async def get( 587 self, 588 key: str, 589 prototype: zarr.core.buffer.BufferPrototype, 590 byte_range: Optional[zarr.abc.store.ByteRequest] = None, 591 ) -> Optional[zarr.core.buffer.Buffer]: 592 with instrumentation.span( 593 "py_hamt.sharded_store.get", 594 { 595 "py_hamt.zarr.key": key, 596 "py_hamt.zarr.byte_range": byte_range is not None, 597 }, 598 ): 599 started_at = time.perf_counter() 600 hit = False 601 kind = "metadata" 602 shard_idx_for_trace: int | None = None 603 chunk_coords = self._parse_chunk_key(key) 604 try: 605 # Metadata request 606 if chunk_coords is None: 607 metadata_cid_obj = self._root_obj["metadata"].get(key) 608 if metadata_cid_obj is None: 609 return None 610 if byte_range is not None: 611 raise ValueError( 612 "Byte range requests are not supported for metadata keys." 613 ) 614 data = self._metadata_read_cache.get(key) 615 if data is None: 616 data = await self.cas.load(str(metadata_cid_obj)) 617 self._metadata_read_cache[key] = data 618 hit = True 619 return prototype.buffer.from_bytes(data) 620 # Chunk data request 621 kind = "chunk" 622 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 623 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 624 shard_idx_for_trace = shard_idx 625 626 # This will load the full shard into cache if it's not already there. 627 shard_lock = self._shard_locks[shard_idx] 628 async with shard_lock: 629 target_shard_list = await self._load_or_initialize_shard_cache( 630 shard_idx 631 ) 632 633 # Get the CID object (or None) from the cached list. 634 chunk_cid_obj = target_shard_list[index_in_shard] 635 636 if chunk_cid_obj is None: 637 return None # Chunk is empty/doesn't exist. 638 639 chunk_cid_str = str(chunk_cid_obj) 640 641 req_offset = None 642 req_length = None 643 req_suffix = None 644 645 if byte_range: 646 if isinstance(byte_range, RangeByteRequest): 647 req_offset = byte_range.start 648 if byte_range.end is not None: 649 if byte_range.start > byte_range.end: 650 raise ValueError( 651 f"Byte range start ({byte_range.start}) cannot be greater than end ({byte_range.end})" 652 ) 653 req_length = byte_range.end - byte_range.start 654 elif isinstance(byte_range, OffsetByteRequest): 655 req_offset = byte_range.offset 656 elif isinstance(byte_range, SuffixByteRequest): 657 req_suffix = byte_range.suffix 658 data = await self.cas.load( 659 chunk_cid_str, 660 offset=req_offset, 661 length=req_length, 662 suffix=req_suffix, 663 ) 664 hit = True 665 return prototype.buffer.from_bytes(data) 666 finally: 667 instrumentation.record_zarr_get( 668 store="sharded_store", 669 key=key, 670 kind=kind, 671 hit=hit, 672 seconds=time.perf_counter() - started_at, 673 byte_range=byte_range is not None, 674 shard_idx=shard_idx_for_trace, 675 )
Retrieve the value associated with a given key.
Parameters
key : str prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. byte_range : ByteRequest, optional ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.
Returns
Buffer
677 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 678 if self.read_only: 679 raise PermissionError("Cannot write to a read-only store.") 680 await self._resize_complete.wait() 681 682 if key.endswith("zarr.json") and not key == "zarr.json": 683 metadata_json = json.loads(value.to_bytes().decode("utf-8")) 684 new_array_shape = metadata_json.get("shape") 685 # Some metadata entries (e.g., group metadata) do not have a shape field. 686 if new_array_shape: 687 # Only resize when the metadata shape represents the primary array. 688 if ( 689 len(new_array_shape) == len(self._array_shape) 690 and tuple(new_array_shape) != self._array_shape 691 ): 692 async with self._resize_lock: 693 # Double-check after acquiring the lock, in case another task 694 # just finished this exact resize while we were waiting. 695 if ( 696 len(new_array_shape) == len(self._array_shape) 697 and tuple(new_array_shape) != self._array_shape 698 ): 699 # Block all other tasks until resize is complete. 700 self._resize_complete.clear() 701 try: 702 await self.resize_store( 703 new_shape=tuple(new_array_shape) 704 ) 705 finally: 706 # All waiting tasks will now un-pause and proceed safely. 707 self._resize_complete.set() 708 709 raw_data_bytes = value.to_bytes() 710 # Save the data to CAS first to get its CID. 711 # Metadata is often saved as 'raw', chunks as well unless compressed. 712 try: 713 data_cid_obj = await self.cas.save(raw_data_bytes, codec="raw") 714 await self.set_pointer(key, str(data_cid_obj)) 715 if self._parse_chunk_key(key) is None: 716 self._metadata_read_cache[key] = raw_data_bytes 717 except Exception as e: 718 raise RuntimeError(f"Failed to save data for key {key}: {e}") 719 return None # type: ignore[return-value]
Store a (key, value) pair.
Parameters
key : str value : Buffer
721 async def set_pointer(self, key: str, pointer: str) -> None: 722 chunk_coords = self._parse_chunk_key(key) 723 724 pointer_cid_obj = CID.decode(pointer) # Convert string to CID object 725 726 if chunk_coords is None: # Metadata key 727 self._root_obj["metadata"][key] = pointer_cid_obj 728 self._dirty_root = True 729 return None 730 731 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 732 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 733 734 shard_lock = self._shard_locks[shard_idx] 735 async with shard_lock: 736 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 737 738 if target_shard_list[index_in_shard] != pointer_cid_obj: 739 target_shard_list[index_in_shard] = pointer_cid_obj 740 await self._shard_data_cache.mark_dirty(shard_idx) 741 return None
743 async def exists(self, key: str) -> bool: 744 try: 745 chunk_coords = self._parse_chunk_key(key) 746 if chunk_coords is None: # Metadata 747 return key in self._root_obj.get("metadata", {}) 748 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 749 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 750 # Load shard if not cached and check the index 751 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 752 return target_shard_list[index_in_shard] is not None 753 except (ValueError, IndexError, KeyError): 754 return False
Check if a key exists in the store.
Parameters
key : str
Returns
bool
760 @property 761 def supports_partial_writes(self) -> bool: 762 return False # Each chunk CID is written atomically into a shard slot
Does the store support partial writes?
768 async def delete(self, key: str) -> None: 769 if self.read_only: 770 raise PermissionError("Cannot delete from a read-only store.") 771 772 chunk_coords = self._parse_chunk_key(key) 773 if chunk_coords is None: # Metadata 774 # Coordinate/metadata deletions should be idempotent for caller convenience. 775 if self._root_obj["metadata"].pop(key, None) is not None: 776 self._metadata_read_cache.pop(key, None) 777 self._dirty_root = True 778 return None 779 780 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 781 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 782 783 shard_lock = self._shard_locks[shard_idx] 784 async with shard_lock: 785 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 786 if target_shard_list[index_in_shard] is not None: 787 target_shard_list[index_in_shard] = None 788 await self._shard_data_cache.mark_dirty(shard_idx)
Remove a key from the store
Parameters
key : str
794 async def list(self) -> AsyncIterator[str]: 795 for key in list(self._root_obj.get("metadata", {})): 796 yield key
Retrieve all keys in the store.
Returns
AsyncIterator[str]
798 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 799 async for key in self.list(): 800 if key.startswith(prefix): 801 yield key
Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.
Parameters
prefix : str
Returns
AsyncIterator[str]
803 async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]): 804 if self.read_only: 805 raise PermissionError("Cannot graft onto a read-only store.") 806 807 store_to_graft = await ShardedZarrStore.open( 808 cas=self.cas, read_only=True, root_cid=store_to_graft_cid 809 ) 810 source_chunk_grid = store_to_graft._chunks_per_dim 811 for local_coords in itertools.product(*[range(s) for s in source_chunk_grid]): 812 linear_local_index = store_to_graft._get_linear_chunk_index(local_coords) 813 local_shard_idx, index_in_local_shard = store_to_graft._get_shard_info( 814 linear_local_index 815 ) 816 # Load the source shard into its cache 817 source_shard_list = await store_to_graft._load_or_initialize_shard_cache( 818 local_shard_idx 819 ) 820 821 pointer_cid_obj = source_shard_list[index_in_local_shard] 822 if pointer_cid_obj is None: 823 continue 824 825 # Calculate global coordinates and write to the main store's index 826 global_coords = tuple( 827 c_local + c_offset 828 for c_local, c_offset in zip(local_coords, chunk_offset) 829 ) 830 linear_global_index = self._get_linear_chunk_index(global_coords) 831 global_shard_idx, index_in_global_shard = self._get_shard_info( 832 linear_global_index 833 ) 834 835 shard_lock = self._shard_locks[global_shard_idx] 836 async with shard_lock: 837 target_shard_list = await self._load_or_initialize_shard_cache( 838 global_shard_idx 839 ) 840 if target_shard_list[index_in_global_shard] != pointer_cid_obj: 841 target_shard_list[index_in_global_shard] = pointer_cid_obj 842 await self._shard_data_cache.mark_dirty(global_shard_idx)
844 async def resize_store(self, new_shape: Tuple[int, ...]): 845 """ 846 Resizes the store's main shard index to accommodate a new overall array shape. 847 This is a metadata-only operation on the store's root object. 848 Used when doing skeleton writes or appends via xarray where the array shape changes. 849 """ 850 if self.read_only: 851 raise PermissionError("Cannot resize a read-only store.") 852 if ( 853 # self._root_obj is None 854 self._chunk_shape is None 855 or self._chunks_per_shard is None 856 or self._array_shape is None 857 ): 858 raise RuntimeError("Store is not properly initialized for resizing.") 859 if len(new_shape) != len(self._array_shape): 860 raise ValueError( 861 "New shape must have the same number of dimensions as the old shape." 862 ) 863 864 self._array_shape = tuple(new_shape) 865 self._chunks_per_dim = tuple( 866 math.ceil(a / c) if c > 0 else 0 867 for a, c in zip(self._array_shape, self._chunk_shape) 868 ) 869 self._total_chunks = math.prod(self._chunks_per_dim) 870 old_num_shards = self._num_shards if self._num_shards is not None else 0 871 self._num_shards = ( 872 (self._total_chunks + self._chunks_per_shard - 1) // self._chunks_per_shard 873 if self._total_chunks > 0 874 else 0 875 ) 876 self._root_obj["chunks"]["array_shape"] = list(self._array_shape) 877 if self._num_shards > old_num_shards: 878 self._root_obj["chunks"]["shard_cids"].extend( 879 [None] * (self._num_shards - old_num_shards) 880 ) 881 elif self._num_shards < old_num_shards: 882 self._root_obj["chunks"]["shard_cids"] = self._root_obj["chunks"][ 883 "shard_cids" 884 ][: self._num_shards] 885 886 self._dirty_root = True
Resizes the store's main shard index to accommodate a new overall array shape. This is a metadata-only operation on the store's root object. Used when doing skeleton writes or appends via xarray where the array shape changes.
888 async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]): 889 """ 890 Resizes the Zarr metadata for a specific variable (e.g., '.json' file). 891 This does NOT change the store's main shard index. 892 """ 893 if self.read_only: 894 raise PermissionError("Cannot resize a read-only store.") 895 896 zarr_metadata_key = f"{variable_name}/zarr.json" 897 898 old_zarr_metadata_cid = self._root_obj["metadata"].get(zarr_metadata_key) 899 if not old_zarr_metadata_cid: 900 raise KeyError( 901 f"Cannot find metadata for key '{zarr_metadata_key}' to resize." 902 ) 903 904 old_zarr_metadata_bytes = await self.cas.load(old_zarr_metadata_cid) 905 zarr_metadata_json = json.loads(old_zarr_metadata_bytes) 906 907 zarr_metadata_json["shape"] = list(new_shape) 908 909 new_zarr_metadata_bytes = json.dumps(zarr_metadata_json, indent=2).encode( 910 "utf-8" 911 ) 912 # Metadata is a raw blob of bytes 913 new_zarr_metadata_cid = await self.cas.save( 914 new_zarr_metadata_bytes, codec="raw" 915 ) 916 917 self._root_obj["metadata"][zarr_metadata_key] = new_zarr_metadata_cid 918 self._metadata_read_cache[zarr_metadata_key] = new_zarr_metadata_bytes 919 self._dirty_root = True
Resizes the Zarr metadata for a specific variable (e.g., '.json' file). This does NOT change the store's main shard index.
921 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 922 seen: Set[str] = set() 923 if prefix == "": 924 async for key in self.list(): # Iterates metadata keys 925 # e.g., if key is "group1/.zgroup" or "array1/.json", first_component is "group1" or "array1" 926 # if key is ".zgroup", first_component is ".zgroup" 927 first_component = key.split("/", 1)[0] 928 if first_component not in seen: 929 seen.add(first_component) 930 yield first_component 931 else: 932 raise NotImplementedError("Listing with a prefix is not implemented yet.")
Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.
Parameters
prefix : str
Returns
AsyncIterator[str]
15async def convert_hamt_to_sharded( 16 cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int 17) -> str: 18 """ 19 Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore. 20 21 Args: 22 cas: An initialized ContentAddressedStore instance (KuboCAS). 23 hamt_root_cid: The root CID of the source ZarrHAMTStore. 24 chunks_per_shard: The number of chunks to group into a single shard in the new store. 25 26 Returns: 27 The root CID of the newly created ShardedZarrStore. 28 """ 29 print(f"--- Starting Conversion from HAMT Root {hamt_root_cid} ---") 30 start_time = time.perf_counter() 31 # 1. Open the source HAMT store for reading 32 print("Opening source HAMT store...") 33 hamt_ro = await HAMT.build( 34 cas=cas, root_node_id=hamt_root_cid, values_are_bytes=True, read_only=True 35 ) 36 source_store = ZarrHAMTStore(hamt_ro, read_only=True) 37 source_dataset = xr.open_zarr(store=source_store, consolidated=True) 38 # 2. Introspect the source array to get its configuration 39 print("Reading metadata from source store...") 40 41 # Read the stores metadata to get array shape and chunk shape 42 data_var_name = next(iter(source_dataset.data_vars)) 43 ordered_dims = list(source_dataset[data_var_name].dims) 44 array_shape_tuple = tuple(source_dataset.sizes[dim] for dim in ordered_dims) 45 chunk_shape_tuple = tuple(source_dataset.chunks[dim][0] for dim in ordered_dims) 46 array_shape = array_shape_tuple 47 chunk_shape = chunk_shape_tuple 48 49 # 3. Create the destination ShardedZarrStore for writing 50 print( 51 f"Initializing new ShardedZarrStore with {chunks_per_shard} chunks per shard..." 52 ) 53 dest_store = await ShardedZarrStore.open( 54 cas=cas, 55 read_only=False, 56 array_shape=array_shape, 57 chunk_shape=chunk_shape, 58 chunks_per_shard=chunks_per_shard, 59 ) 60 61 print("Destination store initialized.") 62 63 # 4. Iterate and copy all data from source to destination 64 print("Starting data migration...") 65 count = 0 66 async for key in hamt_ro.keys(): 67 count += 1 68 # Read the raw data (metadata or chunk) from the source 69 cid: CID = await hamt_ro.get_pointer(key) 70 cid_base32_str = str(cid.encode("base32")) 71 72 # Write the exact same key-value pair to the destination. 73 await dest_store.set_pointer(key, cid_base32_str) 74 if count % 200 == 0: # pragma: no cover 75 print(f"Migrated {count} keys...") # pragma: no cover 76 77 print(f"Migration of {count} total keys complete.") 78 79 # 5. Finalize the new store by flushing it to the CAS 80 print("Flushing new store to get final root CID...") 81 new_root_cid = await dest_store.flush() 82 end_time = time.perf_counter() 83 84 print("\n--- Conversion Complete! ---") 85 print(f"Total time: {end_time - start_time:.2f} seconds") 86 print(f"New ShardedZarrStore Root CID: {new_root_cid}") 87 return new_root_cid
Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore.
Args: cas: An initialized ContentAddressedStore instance (KuboCAS). hamt_root_cid: The root CID of the source ZarrHAMTStore. chunks_per_shard: The number of chunks to group into a single shard in the new store.
Returns: The root CID of the newly created ShardedZarrStore.