py_hamt
1from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore 2from .hamt import HAMT, blake3_hashfn 3from .hamt_to_sharded_converter import convert_hamt_to_sharded, sharded_converter_cli 4from .sharded_zarr_store import ShardedZarrStore 5from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS 6from .zarr_hamt_store import ZarrHAMTStore 7 8__all__ = [ 9 "blake3_hashfn", 10 "HAMT", 11 "ContentAddressedStore", 12 "InMemoryCAS", 13 "KuboCAS", 14 "ZarrHAMTStore", 15 "SimpleEncryptedZarrHAMTStore", 16 "ShardedZarrStore", 17 "convert_hamt_to_sharded", 18 "sharded_converter_cli", 19]
53def blake3_hashfn(input_bytes: bytes) -> bytes: 54 """ 55 This is the default blake3 hash function used for the `HAMT`, with a 32 byte hash size. 56 57 """ 58 # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify 59 digest: bytes = b3.digest(input_bytes, size=32) 60 raw_bytes: bytes = b3.unwrap(digest) 61 return raw_bytes
This is the default blake3 hash function used for the HAMT, with a 32 byte hash size.
289class HAMT: 290 """ 291 An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model. 292 293 Use this to store arbitrarily large key-value mappings in your CAS of choice. 294 295 For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread. 296 297 When in read-only mode, the HAMT is both async and thread safe. 298 299 #### A note about memory management, read+write and read-only modes 300 The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations. 301 302 Note that in read+write, the real root node id IS NOT VALID. You should call `make_read_only()` to convert to read only mode and then read `root_node_id`. 303 304 These optimizations also trade off performance for memory use. Use `cache_size` to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use `cache_vacate` if you are over your memory limits. 305 306 #### IPFS HAMT Sample Code 307 ```python 308 kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints 309 hamt = await HAMT.build(cas=kubo_cas) 310 await hamt.set("foo", "bar") 311 assert (await hamt.get("foo")) == "bar" 312 await hamt.make_read_only() 313 cid = hamt.root_node_id # our root node CID 314 print(cid) 315 ``` 316 """ 317 318 def __init__( 319 self, 320 cas: ContentAddressedStore, 321 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 322 root_node_id: IPLDKind | None = None, 323 read_only: bool = False, 324 max_bucket_size: int = 4, 325 values_are_bytes: bool = False, 326 ): 327 """ 328 Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below. 329 """ 330 331 self.cas: ContentAddressedStore = cas 332 """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS.""" 333 334 self.hash_fn: Callable[[bytes], bytes] = hash_fn 335 """ 336 This is the hash function used to place a key-value within the HAMT. 337 338 To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes. 339 340 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 341 342 Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors. 343 """ 344 345 self.lock: asyncio.Lock = asyncio.Lock() 346 """@private""" 347 348 self.values_are_bytes: bool = values_are_bytes 349 """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind. 350 351 This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk. 352 """ 353 354 if max_bucket_size < 1: 355 raise ValueError("Bucket size maximum must be a positive integer") 356 self.max_bucket_size: int = max_bucket_size 357 """ 358 This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided. 359 360 This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS. 361 362 This must be a positive integer with a minimum of 1. 363 """ 364 365 self.root_node_id: IPLDKind = root_node_id 366 """ 367 This is type IPLDKind but the documentation generator pdoc mangles it a bit. 368 369 Read from this only when in read mode to get something valid! 370 """ 371 372 self.read_only: bool = read_only 373 """Clients should NOT modify this. 374 375 This is here for checking whether the HAMT is in read only or read/write mode. 376 377 The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS. 378 """ 379 self.node_store: NodeStore 380 """@private""" 381 if read_only: 382 self.node_store = ReadCacheStore(self) 383 else: 384 self.node_store = InMemoryTreeStore(self) 385 386 @classmethod 387 async def build(cls, *args: Any, **kwargs: Any) -> "HAMT": 388 """ 389 Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__. 390 391 This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided. 392 """ 393 hamt = cls(*args, **kwargs) 394 if hamt.root_node_id is None: 395 hamt.root_node_id = await hamt.node_store.save(None, Node()) 396 return hamt 397 398 # This is typically a massive blocking operation, you dont want to be running this concurrently with a bunch of other operations, so it's ok to have it not be async 399 async def make_read_only(self) -> None: 400 """ 401 Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID. 402 403 In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between. 404 """ 405 async with self.lock: 406 inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store) 407 await inmemory_tree.vacate() 408 409 self.read_only = True 410 self.node_store = ReadCacheStore(self) 411 412 async def enable_write(self) -> None: 413 """ 414 Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`. 415 """ 416 async with self.lock: 417 # The read cache has no writes that need to be sent upstream so we can remove it without vacating 418 self.read_only = False 419 self.node_store = InMemoryTreeStore(self) 420 421 async def cache_size(self) -> int: 422 """ 423 Returns the memory used by some internal performance optimization tools in bytes. 424 425 This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however. 426 427 Be warned that this may take a while to run for large HAMTs. 428 429 For more on memory management, see the `HAMT` class documentation. 430 """ 431 if self.read_only: 432 return self.node_store.size() 433 async with self.lock: 434 return self.node_store.size() 435 436 async def cache_vacate(self) -> None: 437 """ 438 Vacate and completely empty out the internal read/write cache. 439 440 Be warned that this may take a while if there have been a lot of write operations. 441 442 For more on memory management, see the `HAMT` class documentation. 443 """ 444 if self.read_only: 445 await self.node_store.vacate() 446 else: 447 async with self.lock: 448 await self.node_store.vacate() 449 450 async def _reserialize_and_link( 451 self, node_stack: list[tuple[IPLDKind, Node]] 452 ) -> None: 453 """ 454 This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store 455 Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack 456 Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python 457 If a node ends up being empty, then it is deleted entirely, unless it is the root node 458 Modifies in place 459 """ 460 # iterate in the reverse direction, this range goes from n-1 to 0, from the bottommost tree node to the root 461 for stack_index in range(len(node_stack) - 1, -1, -1): 462 old_id, node = node_stack[stack_index] 463 464 # If this node is empty, and it's not the root node, then we can delete it entirely from the list 465 is_root: bool = stack_index == 0 466 if node.is_empty() and not is_root: 467 # Unlink from the rest of the tree 468 _, prev_node = node_stack[stack_index - 1] 469 # When removing links, don't worry about two nodes having the same link since all nodes are guaranteed to be different by the removal of empty nodes after every single operation 470 for link_index in prev_node.iter_link_indices(): 471 link = prev_node.get_link(link_index) 472 if link == old_id: 473 # Delete the link by making it an empty bucket 474 prev_node.data[link_index] = {} 475 break 476 477 # Remove from our stack, continue reserializing up the tree 478 node_stack.pop(stack_index) 479 continue 480 481 # If not an empty node, just reserialize like normal and replace this one 482 new_store_id: IPLDKind = await self.node_store.save(old_id, node) 483 node_stack[stack_index] = (new_store_id, node) 484 485 # If this is not the last i.e. root node, we need to change the linking of the node prior in the list since we just reserialized 486 if not is_root: 487 _, prev_node = node_stack[stack_index - 1] 488 prev_node.replace_link(old_id, new_store_id) 489 490 # automatically skip encoding if the value provided is of the bytes variety 491 async def set(self, key: str, val: IPLDKind) -> None: 492 """Write a key-value mapping.""" 493 if self.read_only: 494 raise Exception("Cannot call set on a read only HAMT") 495 496 data: bytes 497 if self.values_are_bytes: 498 data = cast( 499 bytes, val 500 ) # let users get an exception if they pass in a non bytes when they want to skip encoding 501 else: 502 data = dag_cbor.encode(val) 503 504 pointer: IPLDKind = await self.cas.save(data, codec="raw") 505 await self._set_pointer(key, pointer) 506 507 async def _set_pointer(self, key: str, val_ptr: IPLDKind) -> None: 508 async with self.lock: 509 node_stack: list[tuple[IPLDKind, Node]] = [] 510 root_node: Node = await self.node_store.load(self.root_node_id) 511 node_stack.append((self.root_node_id, root_node)) 512 513 # FIFO queue to keep track of all the KVs we need to insert 514 # This is needed if any buckets overflow and so we need to reinsert all those KVs 515 kvs_queue: list[tuple[str, IPLDKind]] = [] 516 kvs_queue.append((key, val_ptr)) 517 518 while len(kvs_queue) > 0: 519 _, top_node = node_stack[-1] 520 curr_key, curr_val_ptr = kvs_queue[0] 521 522 raw_hash: bytes = self.hash_fn(curr_key.encode()) 523 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 524 525 item = top_node.data[map_key] 526 if isinstance(item, list): 527 next_node_id: IPLDKind = item[0] 528 next_node: Node = await self.node_store.load(next_node_id) 529 node_stack.append((next_node_id, next_node)) 530 elif isinstance(item, dict): 531 bucket: dict[str, IPLDKind] = item 532 533 # If this bucket already has this same key, or has space, just rewrite the value and then go work on the others in the queue 534 if curr_key in bucket or len(bucket) < self.max_bucket_size: 535 bucket[curr_key] = curr_val_ptr 536 kvs_queue.pop(0) 537 continue 538 539 # The current key is not in the bucket and the bucket is too full, so empty KVs from the bucket and restart insertion 540 for k in bucket: 541 v_ptr = bucket[k] 542 kvs_queue.append((k, v_ptr)) 543 544 # Create a new link to a new node so that we can reflow these KVs into a new subtree 545 new_node = Node() 546 new_node_id: IPLDKind = await self.node_store.save(None, new_node) 547 link: list[IPLDKind] = [new_node_id] 548 top_node.data[map_key] = link 549 550 # Finally, reserialize and fix all links, deleting empty nodes as needed 551 await self._reserialize_and_link(node_stack) 552 self.root_node_id = node_stack[0][0] 553 554 async def delete(self, key: str) -> None: 555 """Delete a key-value mapping.""" 556 557 # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo 558 if self.read_only: 559 raise Exception("Cannot call delete on a read only HAMT") 560 561 async with self.lock: 562 raw_hash: bytes = self.hash_fn(key.encode()) 563 564 node_stack: list[tuple[IPLDKind, Node]] = [] 565 root_node: Node = await self.node_store.load(self.root_node_id) 566 node_stack.append((self.root_node_id, root_node)) 567 568 created_change: bool = False 569 while True: 570 _, top_node = node_stack[-1] 571 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 572 573 item = top_node.data[map_key] 574 if isinstance(item, dict): 575 bucket = item 576 if key in bucket: 577 del bucket[key] 578 created_change = True 579 # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError 580 break 581 elif isinstance(item, list): 582 link: IPLDKind = item[0] 583 next_node: Node = await self.node_store.load(link) 584 node_stack.append((link, next_node)) 585 586 # Finally, reserialize and fix all links, deleting empty nodes as needed 587 if created_change: 588 await self._reserialize_and_link(node_stack) 589 self.root_node_id = node_stack[0][0] 590 else: 591 # If we didn't make a change, then this key must not exist within the HAMT 592 raise KeyError 593 594 async def get( 595 self, 596 key: str, 597 offset: Optional[int] = None, 598 length: Optional[int] = None, 599 suffix: Optional[int] = None, 600 ) -> IPLDKind: 601 """Get a value.""" 602 pointer: IPLDKind = await self.get_pointer(key) 603 data: bytes = await self.cas.load( 604 pointer, offset=offset, length=length, suffix=suffix 605 ) 606 if self.values_are_bytes: 607 return data 608 else: 609 return dag_cbor.decode(data) 610 611 async def get_pointer(self, key: str) -> IPLDKind: 612 """ 613 Get a store ID that points to the value for this key. 614 615 This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example. 616 """ 617 # If read only, no need to acquire a lock 618 pointer: IPLDKind 619 if self.read_only: 620 pointer = await self._get_pointer(key) 621 else: 622 async with self.lock: 623 pointer = await self._get_pointer(key) 624 625 return pointer 626 627 # Callers MUST handle acquiring a lock 628 async def _get_pointer(self, key: str) -> IPLDKind: 629 raw_hash: bytes = self.hash_fn(key.encode()) 630 631 current_id: IPLDKind = self.root_node_id 632 current_depth: int = 0 633 634 # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value of IPLDKind 635 result_ptr: IPLDKind = None 636 found_a_result: bool = False 637 while True: 638 top_id: IPLDKind = current_id 639 top_node: Node = await self.node_store.load(top_id) 640 map_key: int = extract_bits(raw_hash, current_depth, 8) 641 642 # Check if this key is in one of the buckets 643 item = top_node.data[map_key] 644 if isinstance(item, dict): 645 bucket = item 646 if key in bucket: 647 result_ptr = bucket[key] 648 found_a_result = True 649 break 650 651 if isinstance(item, list): 652 link: IPLDKind = item[0] 653 current_id = link 654 current_depth += 1 655 continue 656 657 # Nowhere left to go, stop walking down the tree 658 break 659 660 if not found_a_result: 661 raise KeyError 662 663 return result_ptr 664 665 # Callers MUST handle locking or not on their own 666 async def _iter_nodes(self) -> AsyncIterator[tuple[IPLDKind, Node]]: 667 node_id_stack: list[IPLDKind] = [self.root_node_id] 668 while len(node_id_stack) > 0: 669 top_id: IPLDKind = node_id_stack.pop() 670 node: Node = await self.node_store.load(top_id) 671 yield (top_id, node) 672 node_id_stack.extend(list(node.iter_links())) 673 674 async def keys(self) -> AsyncIterator[str]: 675 """ 676 AsyncIterator returning all keys in the HAMT. 677 678 If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed. 679 680 When the HAMT is in read only mode however, this can be run concurrently with get operations. 681 """ 682 if self.read_only: 683 async for k in self._keys_no_locking(): 684 yield k 685 else: 686 async with self.lock: 687 async for k in self._keys_no_locking(): 688 yield k 689 690 async def _keys_no_locking(self) -> AsyncIterator[str]: 691 async for _, node in self._iter_nodes(): 692 for bucket in node.iter_buckets(): 693 for key in bucket: 694 yield key 695 696 async def len(self) -> int: 697 """ 698 Return the number of key value mappings in this HAMT. 699 700 When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations. 701 """ 702 count: int = 0 703 async for _ in self.keys(): 704 count += 1 705 706 return count
An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.
Use this to store arbitrarily large key-value mappings in your CAS of choice.
For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.
When in read-only mode, the HAMT is both async and thread safe.
A note about memory management, read+write and read-only modes
The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.
Note that in read+write, the real root node id IS NOT VALID. You should call make_read_only() to convert to read only mode and then read root_node_id.
These optimizations also trade off performance for memory use. Use cache_size to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use cache_vacate if you are over your memory limits.
IPFS HAMT Sample Code
kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
hamt = await HAMT.build(cas=kubo_cas)
await hamt.set("foo", "bar")
assert (await hamt.get("foo")) == "bar"
await hamt.make_read_only()
cid = hamt.root_node_id # our root node CID
print(cid)
318 def __init__( 319 self, 320 cas: ContentAddressedStore, 321 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 322 root_node_id: IPLDKind | None = None, 323 read_only: bool = False, 324 max_bucket_size: int = 4, 325 values_are_bytes: bool = False, 326 ): 327 """ 328 Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below. 329 """ 330 331 self.cas: ContentAddressedStore = cas 332 """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS.""" 333 334 self.hash_fn: Callable[[bytes], bytes] = hash_fn 335 """ 336 This is the hash function used to place a key-value within the HAMT. 337 338 To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes. 339 340 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 341 342 Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors. 343 """ 344 345 self.lock: asyncio.Lock = asyncio.Lock() 346 """@private""" 347 348 self.values_are_bytes: bool = values_are_bytes 349 """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind. 350 351 This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk. 352 """ 353 354 if max_bucket_size < 1: 355 raise ValueError("Bucket size maximum must be a positive integer") 356 self.max_bucket_size: int = max_bucket_size 357 """ 358 This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided. 359 360 This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS. 361 362 This must be a positive integer with a minimum of 1. 363 """ 364 365 self.root_node_id: IPLDKind = root_node_id 366 """ 367 This is type IPLDKind but the documentation generator pdoc mangles it a bit. 368 369 Read from this only when in read mode to get something valid! 370 """ 371 372 self.read_only: bool = read_only 373 """Clients should NOT modify this. 374 375 This is here for checking whether the HAMT is in read only or read/write mode. 376 377 The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS. 378 """ 379 self.node_store: NodeStore 380 """@private""" 381 if read_only: 382 self.node_store = ReadCacheStore(self) 383 else: 384 self.node_store = InMemoryTreeStore(self)
Use build if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
The backing storage system. py-hamt provides an implementation KuboCAS for IPFS.
This is the hash function used to place a key-value within the HAMT.
To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
This must be a positive integer with a minimum of 1.
This is type IPLDKind but the documentation generator pdoc mangles it a bit.
Read from this only when in read mode to get something valid!
Clients should NOT modify this.
This is here for checking whether the HAMT is in read only or read/write mode.
The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
386 @classmethod 387 async def build(cls, *args: Any, **kwargs: Any) -> "HAMT": 388 """ 389 Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__. 390 391 This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided. 392 """ 393 hamt = cls(*args, **kwargs) 394 if hamt.root_node_id is None: 395 hamt.root_node_id = await hamt.node_store.save(None, Node()) 396 return hamt
Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as __init__. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
399 async def make_read_only(self) -> None: 400 """ 401 Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID. 402 403 In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between. 404 """ 405 async with self.lock: 406 inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store) 407 await inmemory_tree.vacate() 408 409 self.read_only = True 410 self.node_store = ReadCacheStore(self)
Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
412 async def enable_write(self) -> None: 413 """ 414 Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`. 415 """ 416 async with self.lock: 417 # The read cache has no writes that need to be sent upstream so we can remove it without vacating 418 self.read_only = False 419 self.node_store = InMemoryTreeStore(self)
Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use make_read_only.
421 async def cache_size(self) -> int: 422 """ 423 Returns the memory used by some internal performance optimization tools in bytes. 424 425 This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however. 426 427 Be warned that this may take a while to run for large HAMTs. 428 429 For more on memory management, see the `HAMT` class documentation. 430 """ 431 if self.read_only: 432 return self.node_store.size() 433 async with self.lock: 434 return self.node_store.size()
Returns the memory used by some internal performance optimization tools in bytes.
This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
Be warned that this may take a while to run for large HAMTs.
For more on memory management, see the HAMT class documentation.
436 async def cache_vacate(self) -> None: 437 """ 438 Vacate and completely empty out the internal read/write cache. 439 440 Be warned that this may take a while if there have been a lot of write operations. 441 442 For more on memory management, see the `HAMT` class documentation. 443 """ 444 if self.read_only: 445 await self.node_store.vacate() 446 else: 447 async with self.lock: 448 await self.node_store.vacate()
Vacate and completely empty out the internal read/write cache.
Be warned that this may take a while if there have been a lot of write operations.
For more on memory management, see the HAMT class documentation.
491 async def set(self, key: str, val: IPLDKind) -> None: 492 """Write a key-value mapping.""" 493 if self.read_only: 494 raise Exception("Cannot call set on a read only HAMT") 495 496 data: bytes 497 if self.values_are_bytes: 498 data = cast( 499 bytes, val 500 ) # let users get an exception if they pass in a non bytes when they want to skip encoding 501 else: 502 data = dag_cbor.encode(val) 503 504 pointer: IPLDKind = await self.cas.save(data, codec="raw") 505 await self._set_pointer(key, pointer)
Write a key-value mapping.
554 async def delete(self, key: str) -> None: 555 """Delete a key-value mapping.""" 556 557 # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo 558 if self.read_only: 559 raise Exception("Cannot call delete on a read only HAMT") 560 561 async with self.lock: 562 raw_hash: bytes = self.hash_fn(key.encode()) 563 564 node_stack: list[tuple[IPLDKind, Node]] = [] 565 root_node: Node = await self.node_store.load(self.root_node_id) 566 node_stack.append((self.root_node_id, root_node)) 567 568 created_change: bool = False 569 while True: 570 _, top_node = node_stack[-1] 571 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 572 573 item = top_node.data[map_key] 574 if isinstance(item, dict): 575 bucket = item 576 if key in bucket: 577 del bucket[key] 578 created_change = True 579 # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError 580 break 581 elif isinstance(item, list): 582 link: IPLDKind = item[0] 583 next_node: Node = await self.node_store.load(link) 584 node_stack.append((link, next_node)) 585 586 # Finally, reserialize and fix all links, deleting empty nodes as needed 587 if created_change: 588 await self._reserialize_and_link(node_stack) 589 self.root_node_id = node_stack[0][0] 590 else: 591 # If we didn't make a change, then this key must not exist within the HAMT 592 raise KeyError
Delete a key-value mapping.
594 async def get( 595 self, 596 key: str, 597 offset: Optional[int] = None, 598 length: Optional[int] = None, 599 suffix: Optional[int] = None, 600 ) -> IPLDKind: 601 """Get a value.""" 602 pointer: IPLDKind = await self.get_pointer(key) 603 data: bytes = await self.cas.load( 604 pointer, offset=offset, length=length, suffix=suffix 605 ) 606 if self.values_are_bytes: 607 return data 608 else: 609 return dag_cbor.decode(data)
Get a value.
611 async def get_pointer(self, key: str) -> IPLDKind: 612 """ 613 Get a store ID that points to the value for this key. 614 615 This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example. 616 """ 617 # If read only, no need to acquire a lock 618 pointer: IPLDKind 619 if self.read_only: 620 pointer = await self._get_pointer(key) 621 else: 622 async with self.lock: 623 pointer = await self._get_pointer(key) 624 625 return pointer
Get a store ID that points to the value for this key.
This is useful for some applications that want to implement a read cache. Due to the restrictions of ContentAddressedStore on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in ZarrHAMTStore for example.
674 async def keys(self) -> AsyncIterator[str]: 675 """ 676 AsyncIterator returning all keys in the HAMT. 677 678 If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed. 679 680 When the HAMT is in read only mode however, this can be run concurrently with get operations. 681 """ 682 if self.read_only: 683 async for k in self._keys_no_locking(): 684 yield k 685 else: 686 async with self.lock: 687 async for k in self._keys_no_locking(): 688 yield k
AsyncIterator returning all keys in the HAMT.
If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
When the HAMT is in read only mode however, this can be run concurrently with get operations.
696 async def len(self) -> int: 697 """ 698 Return the number of key value mappings in this HAMT. 699 700 When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations. 701 """ 702 count: int = 0 703 async for _ in self.keys(): 704 count += 1 705 706 return count
Return the number of key value mappings in this HAMT.
When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
14class ContentAddressedStore(ABC): 15 """ 16 Abstract class that represents a content addressed storage that the `HAMT` can use for keeping data. 17 18 Note that the return type of save and input to load is really type `IPLDKind`, but the documentation generator pdoc mangles it unfortunately. 19 20 #### A note on the IPLDKind return types 21 Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions: 22 1. No lists or dicts, since python does not classify these as immutable. 23 2. No `None` values since this is used in HAMT's `__init__` to indicate that an empty HAMT needs to be initialized. 24 """ 25 26 CodecInput = Literal["raw", "dag-cbor"] 27 28 @abstractmethod 29 async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: 30 """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type. 31 32 `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model. 33 """ 34 35 @abstractmethod 36 async def load( 37 self, 38 id: IPLDKind, 39 offset: Optional[int] = None, 40 length: Optional[int] = None, 41 suffix: Optional[int] = None, 42 ) -> bytes: 43 """Retrieve data.""" 44 45 async def pin_cid(self, id: IPLDKind, target_rpc: str) -> None: 46 """Pin a CID in the storage.""" 47 pass # pragma: no cover 48 49 async def unpin_cid(self, id: IPLDKind, target_rpc: str) -> None: 50 """Unpin a CID in the storage.""" 51 pass # pragma: no cover 52 53 async def pin_update( 54 self, old_id: IPLDKind, new_id: IPLDKind, target_rpc: str 55 ) -> None: 56 """Update the pinned CID in the storage.""" 57 pass # pragma: no cover 58 59 async def pin_ls(self, target_rpc: str) -> list[Dict[str, Any]]: 60 """List all pinned CIDs in the storage.""" 61 return [] # pragma: no cover
Abstract class that represents a content addressed storage that the HAMT can use for keeping data.
Note that the return type of save and input to load is really type IPLDKind, but the documentation generator pdoc mangles it unfortunately.
A note on the IPLDKind return types
Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:
- No lists or dicts, since python does not classify these as immutable.
- No
Nonevalues since this is used in HAMT's__init__to indicate that an empty HAMT needs to be initialized.
28 @abstractmethod 29 async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: 30 """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type. 31 32 `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model. 33 """
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
35 @abstractmethod 36 async def load( 37 self, 38 id: IPLDKind, 39 offset: Optional[int] = None, 40 length: Optional[int] = None, 41 suffix: Optional[int] = None, 42 ) -> bytes: 43 """Retrieve data."""
Retrieve data.
45 async def pin_cid(self, id: IPLDKind, target_rpc: str) -> None: 46 """Pin a CID in the storage.""" 47 pass # pragma: no cover
Pin a CID in the storage.
49 async def unpin_cid(self, id: IPLDKind, target_rpc: str) -> None: 50 """Unpin a CID in the storage.""" 51 pass # pragma: no cover
Unpin a CID in the storage.
53 async def pin_update( 54 self, old_id: IPLDKind, new_id: IPLDKind, target_rpc: str 55 ) -> None: 56 """Update the pinned CID in the storage.""" 57 pass # pragma: no cover
Update the pinned CID in the storage.
64class InMemoryCAS(ContentAddressedStore): 65 """Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that `save` returns and `load` takes in.""" 66 67 store: dict[bytes, bytes] 68 hash_alg: Multihash 69 70 def __init__(self): 71 self.store = dict() 72 self.hash_alg = multihash.get("blake3") 73 74 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes: 75 hash: bytes = self.hash_alg.digest(data, size=32) 76 self.store[hash] = data 77 return hash 78 79 async def load( 80 self, 81 id: IPLDKind, 82 offset: Optional[int] = None, 83 length: Optional[int] = None, 84 suffix: Optional[int] = None, 85 ) -> bytes: 86 """ 87 `ContentAddressedStore` allows any IPLD scalar key. For the in-memory 88 backend we *require* a `bytes` hash; anything else is rejected at run 89 time. In OO type-checking, a subclass may widen (make more general) argument types, 90 but it must never narrow them; otherwise callers that expect the base-class contract can break. 91 Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. 92 This is why we use `cast` here, to tell mypy that we know what we are doing. 93 h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch 94 """ 95 key = cast(bytes, id) 96 if not isinstance(key, (bytes, bytearray)): # defensive guard 97 raise TypeError( 98 f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" 99 ) 100 data: bytes 101 try: 102 data = self.store[key] 103 except KeyError as exc: 104 raise KeyError("Object not found in in-memory store") from exc 105 106 if offset is not None: 107 start = offset 108 if length is not None: 109 end = start + length 110 return data[start:end] 111 else: 112 return data[start:] 113 elif suffix is not None: # If only length is given, assume start from 0 114 return data[-suffix:] 115 else: # Full load 116 return data
Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that save returns and load takes in.
74 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes: 75 hash: bytes = self.hash_alg.digest(data, size=32) 76 self.store[hash] = data 77 return hash
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
79 async def load( 80 self, 81 id: IPLDKind, 82 offset: Optional[int] = None, 83 length: Optional[int] = None, 84 suffix: Optional[int] = None, 85 ) -> bytes: 86 """ 87 `ContentAddressedStore` allows any IPLD scalar key. For the in-memory 88 backend we *require* a `bytes` hash; anything else is rejected at run 89 time. In OO type-checking, a subclass may widen (make more general) argument types, 90 but it must never narrow them; otherwise callers that expect the base-class contract can break. 91 Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. 92 This is why we use `cast` here, to tell mypy that we know what we are doing. 93 h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch 94 """ 95 key = cast(bytes, id) 96 if not isinstance(key, (bytes, bytearray)): # defensive guard 97 raise TypeError( 98 f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" 99 ) 100 data: bytes 101 try: 102 data = self.store[key] 103 except KeyError as exc: 104 raise KeyError("Object not found in in-memory store") from exc 105 106 if offset is not None: 107 start = offset 108 if length is not None: 109 end = start + length 110 return data[start:end] 111 else: 112 return data[start:] 113 elif suffix is not None: # If only length is given, assume start from 0 114 return data[-suffix:] 115 else: # Full load 116 return data
ContentAddressedStore allows any IPLD scalar key. For the in-memory
backend we require a bytes hash; anything else is rejected at run
time. In OO type-checking, a subclass may widen (make more general) argument types,
but it must never narrow them; otherwise callers that expect the base-class contract can break.
Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
This is why we use cast here, to tell mypy that we know what we are doing.
h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
119class KuboCAS(ContentAddressedStore): 120 """ 121 Connects to an **IPFS Kubo** daemon. 122 123 The IDs in save and load are IPLD CIDs. 124 125 * **save()** → RPC (`/api/v0/add`) 126 * **load()** → HTTP gateway (`/ipfs/{cid}`) 127 128 `save` uses the RPC API and `load` uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use. 129 130 ### Authentication / custom headers 131 You have two options: 132 133 1. **Bring your own `httpx.AsyncClient`** 134 Pass it via `client=...` — any default headers or auth 135 configured on that client are reused for **every** request. 136 2. **Let `KuboCAS` build the client** but pass 137 `headers=` *and*/or `auth=` kwargs; they are forwarded to the 138 internally–created `AsyncClient`. 139 140 ```python 141 import httpx 142 from py_hamt import KuboCAS 143 144 # Option 1: user-supplied client 145 client = httpx.AsyncClient( 146 headers={"Authorization": "Bearer <token>"}, 147 auth=("user", "pass"), 148 ) 149 cas = KuboCAS(client=client) 150 151 # Option 2: let KuboCAS create the client 152 cas = KuboCAS( 153 headers={"X-My-Header": "yes"}, 154 auth=("user", "pass"), 155 ) 156 ``` 157 158 ### Parameters 159 - **hasher** (str): multihash name (defaults to *blake3*). 160 - **client** (`httpx.AsyncClient | None`): reuse an existing 161 client; if *None* KuboCAS will create one lazily. 162 - **headers** (dict[str, str] | None): default headers for the 163 internally-created client. 164 - **auth** (`tuple[str, str] | None`): authentication tuple (username, password) 165 for the internally-created client. 166 - **rpc_base_url / gateway_base_url** (str | None): override daemon 167 endpoints (defaults match the local daemon ports). 168 - **chunker** (str): chunking algorithm specification for Kubo's `add` 169 RPC. Accepted formats are `"size-<positive int>"`, `"rabin"`, or 170 `"rabin-<min>-<avg>-<max>"`. 171 172 ... 173 """ 174 175 KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = "http://127.0.0.1:8080" 176 KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = "http://127.0.0.1:5001" 177 178 DAG_PB_MARKER: int = 0x70 179 """@private""" 180 181 # Take in a httpx client that can be reused across POSTs and GETs to a specific IPFS daemon 182 def __init__( 183 self, 184 hasher: str = "blake3", 185 client: httpx.AsyncClient | None = None, 186 rpc_base_url: str | None = None, 187 gateway_base_url: str | None = None, 188 concurrency: int = 32, 189 *, 190 headers: dict[str, str] | None = None, 191 auth: Tuple[str, str] | None = None, 192 pin_on_add: bool = False, 193 chunker: str = "size-1048576", 194 max_retries: int = 3, 195 initial_delay: float = 1.0, 196 backoff_factor: float = 2.0, 197 ): 198 """ 199 If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all. 200 201 ### `httpx.AsyncClient` Management 202 If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()` 203 as a class instance cannot know when it will no longer be in use, unless explicitly told to do so. 204 205 If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below: 206 ```python 207 async with httpx.AsyncClient() as client, KuboCAS( 208 rpc_base_url=rpc_base_url, 209 gateway_base_url=gateway_base_url, 210 client=client, 211 ) as kubo_cas: 212 hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) 213 zhs = ZarrHAMTStore(hamt) 214 # Use the KuboCAS instance as needed 215 # ... 216 ``` 217 As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up. 218 ``` python 219 cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url) 220 # Use the KuboCAS instance as needed 221 # ... 222 await cas.aclose() # Ensure resources are cleaned up 223 ``` 224 225 ### Authenticated RPC/Gateway Access 226 Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in. 227 Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided. 228 If you do not need authentication, you can leave these parameters as `None`. 229 230 ### RPC and HTTP Gateway Base URLs 231 These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided. 232 """ 233 234 self._owns_client: bool = False 235 self._closed: bool = True 236 self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {} 237 self._default_headers = headers 238 self._default_auth = auth 239 240 # Now, perform validation that might raise an exception 241 chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)" 242 if re.fullmatch(chunker_pattern, chunker) is None: 243 raise ValueError("Invalid chunker specification") 244 self.chunker: str = chunker 245 246 self.hasher: str = hasher 247 """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.""" 248 249 if rpc_base_url is None: 250 rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma 251 if gateway_base_url is None: 252 gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL 253 254 if "/ipfs/" in gateway_base_url: 255 gateway_base_url = gateway_base_url.split("/ipfs/")[0] 256 257 # Standard gateway URL construction with proper path handling 258 if gateway_base_url.endswith("/"): 259 gateway_base_url = f"{gateway_base_url}ipfs/" 260 else: 261 gateway_base_url = f"{gateway_base_url}/ipfs/" 262 263 pin_string: str = "true" if pin_on_add else "false" 264 self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin={pin_string}" 265 """@private""" 266 self.gateway_base_url: str = gateway_base_url 267 """@private""" 268 269 if client is not None: 270 # A client was supplied by the user. We don't own it. 271 self._owns_client = False 272 self._client_per_loop = {asyncio.get_running_loop(): client} 273 else: 274 # No client supplied. We will own any clients we create. 275 self._owns_client = True 276 self._client_per_loop = {} 277 278 # store for later use by _loop_client() 279 self._default_headers = headers 280 self._default_auth = auth 281 282 self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency) 283 self._closed = False 284 285 # Validate retry parameters 286 if max_retries < 0: 287 raise ValueError("max_retries must be non-negative") 288 if initial_delay <= 0: 289 raise ValueError("initial_delay must be positive") 290 if backoff_factor < 1.0: 291 raise ValueError("backoff_factor must be >= 1.0 for exponential backoff") 292 293 self.max_retries = max_retries 294 self.initial_delay = initial_delay 295 self.backoff_factor = backoff_factor 296 297 # --------------------------------------------------------------------- # 298 # helper: get or create the client bound to the current running loop # 299 # --------------------------------------------------------------------- # 300 def _loop_client(self) -> httpx.AsyncClient: 301 """Get or create a client for the current event loop. 302 303 If the instance was previously closed but owns its clients, a fresh 304 client mapping is lazily created on demand. Users that supplied their 305 own ``httpx.AsyncClient`` still receive an error when the instance has 306 been closed, as we cannot safely recreate their client. 307 """ 308 if self._closed: 309 if not self._owns_client: 310 raise RuntimeError("KuboCAS is closed; create a new instance") 311 # We previously closed all internally-owned clients. Reset the 312 # state so that new clients can be created lazily. 313 self._closed = False 314 self._client_per_loop = {} 315 316 loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() 317 try: 318 return self._client_per_loop[loop] 319 except KeyError: 320 # Create a new client 321 client = httpx.AsyncClient( 322 timeout=60.0, 323 headers=self._default_headers, 324 auth=self._default_auth, 325 limits=httpx.Limits(max_connections=64, max_keepalive_connections=32), 326 # Uncomment when they finally support Robust HTTP/2 GOAWAY responses 327 # http2=True, 328 ) 329 self._client_per_loop[loop] = client 330 return client 331 332 # --------------------------------------------------------------------- # 333 # graceful shutdown: close **all** clients we own # 334 # --------------------------------------------------------------------- # 335 async def aclose(self) -> None: 336 """ 337 Closes all internally-created clients. Must be called from an async context. 338 """ 339 if self._owns_client is False: # external client → caller closes 340 return 341 342 # This method is async, so we can reliably await the async close method. 343 # The complex sync/async logic is handled by __del__. 344 for client in list(self._client_per_loop.values()): 345 if not client.is_closed: 346 try: 347 await client.aclose() 348 except Exception: 349 pass # best-effort cleanup 350 351 self._client_per_loop.clear() 352 self._closed = True 353 354 # At this point, _client_per_loop should be empty or only contain 355 # clients from loops we haven't seen (which shouldn't happen in practice) 356 async def __aenter__(self) -> "KuboCAS": 357 return self 358 359 async def __aexit__(self, *exc: Any) -> None: 360 await self.aclose() 361 362 def __del__(self) -> None: 363 """Best-effort close for internally-created clients.""" 364 if not hasattr(self, "_owns_client") or not hasattr(self, "_closed"): 365 return 366 367 if not self._owns_client or self._closed: 368 return 369 370 # Attempt proper cleanup if possible 371 try: 372 loop = asyncio.get_running_loop() 373 except RuntimeError: 374 # No running loop - can't do async cleanup 375 # Just clear the client references synchronously 376 if hasattr(self, "_client_per_loop"): 377 # We can't await client.aclose() without a loop, 378 # so just clear the references 379 self._client_per_loop.clear() 380 self._closed = True 381 return 382 383 # If we get here, we have a running loop 384 try: 385 if loop.is_running(): 386 # Schedule cleanup in the existing loop 387 loop.create_task(self.aclose()) 388 else: 389 # Loop exists but not running - try asyncio.run 390 coro = self.aclose() # Create the coroutine 391 try: 392 asyncio.run(coro) 393 except Exception: 394 # If asyncio.run fails, we need to close the coroutine properly 395 coro.close() # This prevents the RuntimeWarning 396 raise # Re-raise to hit the outer except block 397 except Exception: 398 # If all else fails, just clear references 399 if hasattr(self, "_client_per_loop"): 400 self._client_per_loop.clear() 401 self._closed = True 402 403 # --------------------------------------------------------------------- # 404 # save() – now uses the per-loop client # 405 # --------------------------------------------------------------------- # 406 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID: 407 async with self._sem: 408 files = {"file": data} 409 client = self._loop_client() 410 retry_count = 0 411 412 while retry_count <= self.max_retries: 413 try: 414 response = await client.post( 415 self.rpc_url, files=files, timeout=60.0 416 ) 417 response.raise_for_status() 418 cid_str: str = response.json()["Hash"] 419 cid: CID = CID.decode(cid_str) 420 if cid.codec.code != self.DAG_PB_MARKER: 421 cid = cid.set(codec=codec) 422 return cid 423 424 except (httpx.TimeoutException, httpx.RequestError) as e: 425 retry_count += 1 426 if retry_count > self.max_retries: 427 raise httpx.TimeoutException( 428 f"Failed to save data after {self.max_retries} retries: {str(e)}", 429 request=e.request 430 if isinstance(e, httpx.RequestError) 431 else None, 432 ) 433 434 # Calculate backoff delay 435 delay = self.initial_delay * ( 436 self.backoff_factor ** (retry_count - 1) 437 ) 438 # Add some jitter to prevent thundering herd 439 jitter = delay * 0.1 * (random.random() - 0.5) 440 await asyncio.sleep(delay + jitter) 441 442 except httpx.HTTPStatusError: 443 # Re-raise non-timeout HTTP errors immediately 444 raise 445 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover 446 447 async def load( 448 self, 449 id: IPLDKind, 450 offset: Optional[int] = None, 451 length: Optional[int] = None, 452 suffix: Optional[int] = None, 453 ) -> bytes: 454 """Load data from a CID using the IPFS gateway with optional Range requests.""" 455 cid = cast(CID, id) 456 url: str = f"{self.gateway_base_url + str(cid)}" 457 headers: Dict[str, str] = {} 458 459 # Construct the Range header if required 460 if offset is not None: 461 start = offset 462 if length is not None: 463 # Standard HTTP Range: bytes=start-end (inclusive) 464 end = start + length - 1 465 headers["Range"] = f"bytes={start}-{end}" 466 else: 467 # Standard HTTP Range: bytes=start- (from start to end) 468 headers["Range"] = f"bytes={start}-" 469 elif suffix is not None: 470 # Standard HTTP Range: bytes=-N (last N bytes) 471 headers["Range"] = f"bytes=-{suffix}" 472 473 async with self._sem: # Throttle gateway 474 client = self._loop_client() 475 retry_count = 0 476 477 while retry_count <= self.max_retries: 478 try: 479 response = await client.get( 480 url, headers=headers or None, timeout=60.0 481 ) 482 response.raise_for_status() 483 return response.content 484 485 except (httpx.TimeoutException, httpx.RequestError) as e: 486 retry_count += 1 487 if retry_count > self.max_retries: 488 raise httpx.TimeoutException( 489 f"Failed to load data after {self.max_retries} retries: {str(e)}", 490 request=e.request 491 if isinstance(e, httpx.RequestError) 492 else None, 493 ) 494 495 # Calculate backoff delay with jitter 496 delay = self.initial_delay * ( 497 self.backoff_factor ** (retry_count - 1) 498 ) 499 jitter = delay * 0.1 * (random.random() - 0.5) 500 await asyncio.sleep(delay + jitter) 501 502 except httpx.HTTPStatusError: 503 # Re-raise non-timeout HTTP errors immediately 504 raise 505 506 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover 507 508 # --------------------------------------------------------------------- # 509 # pin_cid() – method to pin a CID # 510 # --------------------------------------------------------------------- # 511 async def pin_cid( 512 self, 513 cid: CID, 514 target_rpc: str = "http://127.0.0.1:5001", 515 ) -> None: 516 """ 517 Pins a CID to the local Kubo node via the RPC API. 518 519 This call is recursive by default, pinning all linked objects. 520 521 Args: 522 cid (CID): The Content ID to pin. 523 target_rpc (str): The RPC URL of the Kubo node. 524 """ 525 params = {"arg": str(cid), "recursive": "true"} 526 pin_add_url_base: str = f"{target_rpc}/api/v0/pin/add" 527 528 async with self._sem: # throttle RPC 529 client = self._loop_client() 530 response = await client.post(pin_add_url_base, params=params) 531 response.raise_for_status() 532 533 async def unpin_cid( 534 self, cid: CID, target_rpc: str = "http://127.0.0.1:5001" 535 ) -> None: 536 """ 537 Unpins a CID from the local Kubo node via the RPC API. 538 539 Args: 540 cid (CID): The Content ID to unpin. 541 """ 542 params = {"arg": str(cid), "recursive": "true"} 543 unpin_url_base: str = f"{target_rpc}/api/v0/pin/rm" 544 async with self._sem: # throttle RPC 545 client = self._loop_client() 546 response = await client.post(unpin_url_base, params=params) 547 response.raise_for_status() 548 549 async def pin_update( 550 self, 551 old_id: IPLDKind, 552 new_id: IPLDKind, 553 target_rpc: str = "http://127.0.0.1:5001", 554 ) -> None: 555 """ 556 Updates the pinned CID in the storage. 557 558 Args: 559 old_id (IPLDKind): The old Content ID to replace. 560 new_id (IPLDKind): The new Content ID to pin. 561 """ 562 params = {"arg": [str(old_id), str(new_id)]} 563 pin_update_url_base: str = f"{target_rpc}/api/v0/pin/update" 564 async with self._sem: # throttle RPC 565 client = self._loop_client() 566 response = await client.post(pin_update_url_base, params=params) 567 response.raise_for_status() 568 569 async def pin_ls( 570 self, target_rpc: str = "http://127.0.0.1:5001" 571 ) -> list[Dict[str, Any]]: 572 """ 573 Lists all pinned CIDs on the local Kubo node via the RPC API. 574 575 Args: 576 target_rpc (str): The RPC URL of the Kubo node. 577 578 Returns: 579 List[CID]: A list of pinned CIDs. 580 """ 581 pin_ls_url_base: str = f"{target_rpc}/api/v0/pin/ls" 582 async with self._sem: # throttle RPC 583 client = self._loop_client() 584 response = await client.post(pin_ls_url_base) 585 response.raise_for_status() 586 pins = response.json().get("Keys", []) 587 return pins
Connects to an IPFS Kubo daemon.
The IDs in save and load are IPLD CIDs.
- save() → RPC (
/api/v0/add) - load() → HTTP gateway (
/ipfs/{cid})
save uses the RPC API and load uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.
Authentication / custom headers
You have two options:
- Bring your own
httpx.AsyncClientPass it viaclient=...— any default headers or auth configured on that client are reused for every request. - Let
KuboCASbuild the client but passheaders=and/orauth=kwargs; they are forwarded to the internally–createdAsyncClient.
import httpx
from py_hamt import KuboCAS
# Option 1: user-supplied client
client = httpx.AsyncClient(
headers={"Authorization": "Bearer <token>"},
auth=("user", "pass"),
)
cas = KuboCAS(client=client)
# Option 2: let KuboCAS create the client
cas = KuboCAS(
headers={"X-My-Header": "yes"},
auth=("user", "pass"),
)
Parameters
- hasher (str): multihash name (defaults to blake3).
- client (
httpx.AsyncClient | None): reuse an existing client; if None KuboCAS will create one lazily. - headers (dict[str, str] | None): default headers for the internally-created client.
- auth (
tuple[str, str] | None): authentication tuple (username, password) for the internally-created client. - rpc_base_url / gateway_base_url (str | None): override daemon endpoints (defaults match the local daemon ports).
- chunker (str): chunking algorithm specification for Kubo's
addRPC. Accepted formats are"size-<positive int>","rabin", or"rabin-<min>-<avg>-<max>".
...
182 def __init__( 183 self, 184 hasher: str = "blake3", 185 client: httpx.AsyncClient | None = None, 186 rpc_base_url: str | None = None, 187 gateway_base_url: str | None = None, 188 concurrency: int = 32, 189 *, 190 headers: dict[str, str] | None = None, 191 auth: Tuple[str, str] | None = None, 192 pin_on_add: bool = False, 193 chunker: str = "size-1048576", 194 max_retries: int = 3, 195 initial_delay: float = 1.0, 196 backoff_factor: float = 2.0, 197 ): 198 """ 199 If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all. 200 201 ### `httpx.AsyncClient` Management 202 If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()` 203 as a class instance cannot know when it will no longer be in use, unless explicitly told to do so. 204 205 If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below: 206 ```python 207 async with httpx.AsyncClient() as client, KuboCAS( 208 rpc_base_url=rpc_base_url, 209 gateway_base_url=gateway_base_url, 210 client=client, 211 ) as kubo_cas: 212 hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) 213 zhs = ZarrHAMTStore(hamt) 214 # Use the KuboCAS instance as needed 215 # ... 216 ``` 217 As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up. 218 ``` python 219 cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url) 220 # Use the KuboCAS instance as needed 221 # ... 222 await cas.aclose() # Ensure resources are cleaned up 223 ``` 224 225 ### Authenticated RPC/Gateway Access 226 Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in. 227 Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided. 228 If you do not need authentication, you can leave these parameters as `None`. 229 230 ### RPC and HTTP Gateway Base URLs 231 These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided. 232 """ 233 234 self._owns_client: bool = False 235 self._closed: bool = True 236 self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {} 237 self._default_headers = headers 238 self._default_auth = auth 239 240 # Now, perform validation that might raise an exception 241 chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)" 242 if re.fullmatch(chunker_pattern, chunker) is None: 243 raise ValueError("Invalid chunker specification") 244 self.chunker: str = chunker 245 246 self.hasher: str = hasher 247 """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.""" 248 249 if rpc_base_url is None: 250 rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma 251 if gateway_base_url is None: 252 gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL 253 254 if "/ipfs/" in gateway_base_url: 255 gateway_base_url = gateway_base_url.split("/ipfs/")[0] 256 257 # Standard gateway URL construction with proper path handling 258 if gateway_base_url.endswith("/"): 259 gateway_base_url = f"{gateway_base_url}ipfs/" 260 else: 261 gateway_base_url = f"{gateway_base_url}/ipfs/" 262 263 pin_string: str = "true" if pin_on_add else "false" 264 self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin={pin_string}" 265 """@private""" 266 self.gateway_base_url: str = gateway_base_url 267 """@private""" 268 269 if client is not None: 270 # A client was supplied by the user. We don't own it. 271 self._owns_client = False 272 self._client_per_loop = {asyncio.get_running_loop(): client} 273 else: 274 # No client supplied. We will own any clients we create. 275 self._owns_client = True 276 self._client_per_loop = {} 277 278 # store for later use by _loop_client() 279 self._default_headers = headers 280 self._default_auth = auth 281 282 self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency) 283 self._closed = False 284 285 # Validate retry parameters 286 if max_retries < 0: 287 raise ValueError("max_retries must be non-negative") 288 if initial_delay <= 0: 289 raise ValueError("initial_delay must be positive") 290 if backoff_factor < 1.0: 291 raise ValueError("backoff_factor must be >= 1.0 for exponential backoff") 292 293 self.max_retries = max_retries 294 self.initial_delay = initial_delay 295 self.backoff_factor = backoff_factor
If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
httpx.AsyncClient Management
If client is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using await cas.aclose()
as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
If you are using the KuboCAS instance in an async with block, it will automatically close the client when the block is exited which is what we suggest below:
async with httpx.AsyncClient() as client, KuboCAS(
rpc_base_url=rpc_base_url,
gateway_base_url=gateway_base_url,
client=client,
) as kubo_cas:
hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
zhs = ZarrHAMTStore(hamt)
# Use the KuboCAS instance as needed
# ...
As mentioned, if you do not use the async with syntax, you should call await cas.aclose() when you are done using the instance to ensure that all resources are cleaned up.
cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
# Use the KuboCAS instance as needed
# ...
await cas.aclose() # Ensure resources are cleaned up
Authenticated RPC/Gateway Access
Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own httpx.AsyncClient and then passing that in.
Alternatively, they can pass in headers and auth parameters to the constructor, which will be used to create a new httpx.AsyncClient if one is not provided.
If you do not need authentication, you can leave these parameters as None.
RPC and HTTP Gateway Base URLs
These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.
335 async def aclose(self) -> None: 336 """ 337 Closes all internally-created clients. Must be called from an async context. 338 """ 339 if self._owns_client is False: # external client → caller closes 340 return 341 342 # This method is async, so we can reliably await the async close method. 343 # The complex sync/async logic is handled by __del__. 344 for client in list(self._client_per_loop.values()): 345 if not client.is_closed: 346 try: 347 await client.aclose() 348 except Exception: 349 pass # best-effort cleanup 350 351 self._client_per_loop.clear() 352 self._closed = True
Closes all internally-created clients. Must be called from an async context.
406 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID: 407 async with self._sem: 408 files = {"file": data} 409 client = self._loop_client() 410 retry_count = 0 411 412 while retry_count <= self.max_retries: 413 try: 414 response = await client.post( 415 self.rpc_url, files=files, timeout=60.0 416 ) 417 response.raise_for_status() 418 cid_str: str = response.json()["Hash"] 419 cid: CID = CID.decode(cid_str) 420 if cid.codec.code != self.DAG_PB_MARKER: 421 cid = cid.set(codec=codec) 422 return cid 423 424 except (httpx.TimeoutException, httpx.RequestError) as e: 425 retry_count += 1 426 if retry_count > self.max_retries: 427 raise httpx.TimeoutException( 428 f"Failed to save data after {self.max_retries} retries: {str(e)}", 429 request=e.request 430 if isinstance(e, httpx.RequestError) 431 else None, 432 ) 433 434 # Calculate backoff delay 435 delay = self.initial_delay * ( 436 self.backoff_factor ** (retry_count - 1) 437 ) 438 # Add some jitter to prevent thundering herd 439 jitter = delay * 0.1 * (random.random() - 0.5) 440 await asyncio.sleep(delay + jitter) 441 442 except httpx.HTTPStatusError: 443 # Re-raise non-timeout HTTP errors immediately 444 raise 445 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
447 async def load( 448 self, 449 id: IPLDKind, 450 offset: Optional[int] = None, 451 length: Optional[int] = None, 452 suffix: Optional[int] = None, 453 ) -> bytes: 454 """Load data from a CID using the IPFS gateway with optional Range requests.""" 455 cid = cast(CID, id) 456 url: str = f"{self.gateway_base_url + str(cid)}" 457 headers: Dict[str, str] = {} 458 459 # Construct the Range header if required 460 if offset is not None: 461 start = offset 462 if length is not None: 463 # Standard HTTP Range: bytes=start-end (inclusive) 464 end = start + length - 1 465 headers["Range"] = f"bytes={start}-{end}" 466 else: 467 # Standard HTTP Range: bytes=start- (from start to end) 468 headers["Range"] = f"bytes={start}-" 469 elif suffix is not None: 470 # Standard HTTP Range: bytes=-N (last N bytes) 471 headers["Range"] = f"bytes=-{suffix}" 472 473 async with self._sem: # Throttle gateway 474 client = self._loop_client() 475 retry_count = 0 476 477 while retry_count <= self.max_retries: 478 try: 479 response = await client.get( 480 url, headers=headers or None, timeout=60.0 481 ) 482 response.raise_for_status() 483 return response.content 484 485 except (httpx.TimeoutException, httpx.RequestError) as e: 486 retry_count += 1 487 if retry_count > self.max_retries: 488 raise httpx.TimeoutException( 489 f"Failed to load data after {self.max_retries} retries: {str(e)}", 490 request=e.request 491 if isinstance(e, httpx.RequestError) 492 else None, 493 ) 494 495 # Calculate backoff delay with jitter 496 delay = self.initial_delay * ( 497 self.backoff_factor ** (retry_count - 1) 498 ) 499 jitter = delay * 0.1 * (random.random() - 0.5) 500 await asyncio.sleep(delay + jitter) 501 502 except httpx.HTTPStatusError: 503 # Re-raise non-timeout HTTP errors immediately 504 raise 505 506 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover
Load data from a CID using the IPFS gateway with optional Range requests.
511 async def pin_cid( 512 self, 513 cid: CID, 514 target_rpc: str = "http://127.0.0.1:5001", 515 ) -> None: 516 """ 517 Pins a CID to the local Kubo node via the RPC API. 518 519 This call is recursive by default, pinning all linked objects. 520 521 Args: 522 cid (CID): The Content ID to pin. 523 target_rpc (str): The RPC URL of the Kubo node. 524 """ 525 params = {"arg": str(cid), "recursive": "true"} 526 pin_add_url_base: str = f"{target_rpc}/api/v0/pin/add" 527 528 async with self._sem: # throttle RPC 529 client = self._loop_client() 530 response = await client.post(pin_add_url_base, params=params) 531 response.raise_for_status()
Pins a CID to the local Kubo node via the RPC API.
This call is recursive by default, pinning all linked objects.
Args: cid (CID): The Content ID to pin. target_rpc (str): The RPC URL of the Kubo node.
533 async def unpin_cid( 534 self, cid: CID, target_rpc: str = "http://127.0.0.1:5001" 535 ) -> None: 536 """ 537 Unpins a CID from the local Kubo node via the RPC API. 538 539 Args: 540 cid (CID): The Content ID to unpin. 541 """ 542 params = {"arg": str(cid), "recursive": "true"} 543 unpin_url_base: str = f"{target_rpc}/api/v0/pin/rm" 544 async with self._sem: # throttle RPC 545 client = self._loop_client() 546 response = await client.post(unpin_url_base, params=params) 547 response.raise_for_status()
Unpins a CID from the local Kubo node via the RPC API.
Args: cid (CID): The Content ID to unpin.
549 async def pin_update( 550 self, 551 old_id: IPLDKind, 552 new_id: IPLDKind, 553 target_rpc: str = "http://127.0.0.1:5001", 554 ) -> None: 555 """ 556 Updates the pinned CID in the storage. 557 558 Args: 559 old_id (IPLDKind): The old Content ID to replace. 560 new_id (IPLDKind): The new Content ID to pin. 561 """ 562 params = {"arg": [str(old_id), str(new_id)]} 563 pin_update_url_base: str = f"{target_rpc}/api/v0/pin/update" 564 async with self._sem: # throttle RPC 565 client = self._loop_client() 566 response = await client.post(pin_update_url_base, params=params) 567 response.raise_for_status()
Updates the pinned CID in the storage.
Args: old_id (IPLDKind): The old Content ID to replace. new_id (IPLDKind): The new Content ID to pin.
569 async def pin_ls( 570 self, target_rpc: str = "http://127.0.0.1:5001" 571 ) -> list[Dict[str, Any]]: 572 """ 573 Lists all pinned CIDs on the local Kubo node via the RPC API. 574 575 Args: 576 target_rpc (str): The RPC URL of the Kubo node. 577 578 Returns: 579 List[CID]: A list of pinned CIDs. 580 """ 581 pin_ls_url_base: str = f"{target_rpc}/api/v0/pin/ls" 582 async with self._sem: # throttle RPC 583 client = self._loop_client() 584 response = await client.post(pin_ls_url_base) 585 response.raise_for_status() 586 pins = response.json().get("Keys", []) 587 return pins
Lists all pinned CIDs on the local Kubo node via the RPC API.
Args: target_rpc (str): The RPC URL of the Kubo node.
Returns: List[CID]: A list of pinned CIDs.
13class ZarrHAMTStore(zarr.abc.store.Store): 14 """ 15 Write and read Zarr v3s with a HAMT. 16 17 Read **or** write a Zarr-v3 store whose key/value pairs live inside a 18 py-hamt mapping. 19 20 Keys are stored verbatim (``"temp/c/0/0/0"`` → same string in HAMT) and 21 the value is the raw byte payload produced by Zarr. No additional 22 framing, compression, or encryption is applied by this class. For a zarr encryption example 23 see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb 24 For a fully encrypted zarr store, where metadata is not available, please see 25 :class:`SimpleEncryptedZarrHAMTStore` but we do not recommend using it. 26 27 #### A note about using the same `ZarrHAMTStore` for writing and then reading again 28 If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings. 29 30 #### Sample Code 31 ```python 32 # --- Write --- 33 ds: xarray.Dataset = # ... 34 cas: ContentAddressedStore = # ... 35 hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes 36 hamt = await HAMT.build(cas, values_are_bytes=True) # write-enabled 37 zhs = ZarrHAMTStore(hamt, read_only=False) 38 ds.to_zarr(store=zhs, mode="w", zarr_format=3) 39 await hamt.make_read_only() # flush + freeze 40 root_node_id = hamt.root_node_id 41 print(root_node_id) 42 43 # --- read --- 44 hamt_ro = await HAMT.build( 45 cas, root_node_id=root_cid, read_only=True, values_are_bytes=True 46 ) 47 zhs_ro = ZarrHAMTStore(hamt_ro, read_only=True) 48 ds_ro = xarray.open_zarr(store=zhs_ro) 49 50 51 print(ds_ro) 52 xarray.testing.assert_identical(ds, ds_ro) 53 ``` 54 """ 55 56 _forced_read_only: bool | None = None # sentinel for wrapper clones 57 58 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 59 """ 60 ### `hamt` and `read_only` 61 You need to make sure the following two things are true: 62 63 1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async. 64 2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations. 65 66 ##### A note about the zarr chunk separator, "/" vs "." 67 While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well. 68 69 #### Metadata Read Cache 70 `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data. 71 """ 72 super().__init__(read_only=read_only) 73 74 assert hamt.read_only == read_only 75 assert hamt.values_are_bytes 76 self.hamt: HAMT = hamt 77 """ 78 The internal HAMT. 79 Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID. 80 """ 81 82 self.metadata_read_cache: dict[str, bytes] = {} 83 """@private""" 84 85 def _map_byte_request( 86 self, byte_range: Optional[zarr.abc.store.ByteRequest] 87 ) -> tuple[Optional[int], Optional[int], Optional[int]]: 88 """Helper to map Zarr ByteRequest to offset, length, suffix.""" 89 offset: Optional[int] = None 90 length: Optional[int] = None 91 suffix: Optional[int] = None 92 93 if byte_range: 94 if isinstance(byte_range, zarr.abc.store.RangeByteRequest): 95 offset = byte_range.start 96 length = byte_range.end - byte_range.start 97 if length is not None and length < 0: 98 raise ValueError("End must be >= start for RangeByteRequest") 99 elif isinstance(byte_range, zarr.abc.store.OffsetByteRequest): 100 offset = byte_range.offset 101 elif isinstance(byte_range, zarr.abc.store.SuffixByteRequest): 102 suffix = byte_range.suffix 103 else: 104 raise TypeError(f"Unsupported ByteRequest type: {type(byte_range)}") 105 106 return offset, length, suffix 107 108 @property 109 def read_only(self) -> bool: # type: ignore[override] 110 if self._forced_read_only is not None: # instance attr overrides 111 return self._forced_read_only 112 return self.hamt.read_only 113 114 def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore": 115 """ 116 Return this store (if the flag already matches) or a *shallow* 117 clone that presents the requested read‑only status. 118 119 The clone **shares** the same :class:`~py_hamt.hamt.HAMT` 120 instance; no flushing, network traffic or async work is done. 121 """ 122 # Fast path 123 if read_only == self.read_only: 124 return self # Same mode, return same instance 125 126 # Create new instance with different read_only flag 127 # Creates a *bare* instance without running its __init__ 128 clone = type(self).__new__(type(self)) 129 130 # Copy attributes that matter 131 clone.hamt = self.hamt # Share the HAMT 132 clone._forced_read_only = read_only 133 clone.metadata_read_cache = self.metadata_read_cache.copy() 134 135 # Re‑initialise the zarr base class so that Zarr sees the flag 136 zarr.abc.store.Store.__init__(clone, read_only=read_only) 137 return clone 138 139 def __eq__(self, other: object) -> bool: 140 """@private""" 141 if not isinstance(other, ZarrHAMTStore): 142 return False 143 return self.hamt.root_node_id == other.hamt.root_node_id 144 145 async def get( 146 self, 147 key: str, 148 prototype: zarr.core.buffer.BufferPrototype, 149 byte_range: zarr.abc.store.ByteRequest | None = None, 150 ) -> zarr.core.buffer.Buffer | None: 151 """@private""" 152 try: 153 val: bytes 154 # do len check to avoid indexing into overly short strings, 3.12 does not throw errors but we dont know if other versions will 155 is_metadata: bool = ( 156 len(key) >= 9 and key[-9:] == "zarr.json" 157 ) # if path ends with zarr.json 158 159 if is_metadata and byte_range is None and key in self.metadata_read_cache: 160 val = self.metadata_read_cache[key] 161 else: 162 offset, length, suffix = self._map_byte_request(byte_range) 163 val = cast( 164 bytes, 165 await self.hamt.get( 166 key, offset=offset, length=length, suffix=suffix 167 ), 168 ) # We know values received will always be bytes since we only store bytes in the HAMT 169 if is_metadata and byte_range is None: 170 self.metadata_read_cache[key] = val 171 172 return prototype.buffer.from_bytes(val) 173 except KeyError: 174 # Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases 175 return None 176 except Exception as e: 177 print(f"Error getting key '{key}' with range {byte_range}: {e}") 178 raise 179 180 async def get_partial_values( 181 self, 182 prototype: zarr.core.buffer.BufferPrototype, 183 key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], 184 ) -> list[zarr.core.buffer.Buffer | None]: 185 """ 186 Retrieves multiple keys or byte ranges concurrently using asyncio.gather. 187 """ 188 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 189 results = await asyncio.gather( 190 *tasks, return_exceptions=False 191 ) # Set return_exceptions=True for debugging 192 return results 193 194 async def exists(self, key: str) -> bool: 195 """@private""" 196 try: 197 await self.hamt.get(key) 198 return True 199 except KeyError: 200 return False 201 202 @property 203 def supports_writes(self) -> bool: 204 """@private""" 205 return not self.hamt.read_only 206 207 @property 208 def supports_partial_writes(self) -> bool: 209 """@private""" 210 return False 211 212 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 213 """@private""" 214 if self.read_only: 215 raise Exception("Cannot write to a read-only store.") 216 217 if key in self.metadata_read_cache: 218 self.metadata_read_cache[key] = value.to_bytes() 219 await self.hamt.set(key, value.to_bytes()) 220 221 async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None: 222 """@private""" 223 if not (await self.exists(key)): 224 await self.set(key, value) 225 226 async def set_partial_values( 227 self, key_start_values: Iterable[tuple[str, int, BytesLike]] 228 ) -> None: 229 """@private""" 230 raise NotImplementedError 231 232 @property 233 def supports_deletes(self) -> bool: 234 """@private""" 235 return not self.hamt.read_only 236 237 async def delete(self, key: str) -> None: 238 """@private""" 239 if self.read_only: 240 raise Exception("Cannot write to a read-only store.") 241 try: 242 await self.hamt.delete(key) 243 # In practice these lines never seem to be needed, creating and appending data are the only operations most zarrs actually undergo 244 # if key in self.metadata_read_cache: 245 # del self.metadata_read_cache[key] 246 # It's fine if the key was not in the HAMT 247 # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason, probably concurrency issues 248 except KeyError: 249 return 250 251 @property 252 def supports_listing(self) -> bool: 253 """@private""" 254 return True 255 256 async def list(self) -> AsyncIterator[str]: 257 """@private""" 258 async for key in self.hamt.keys(): 259 yield key 260 261 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 262 """@private""" 263 async for key in self.hamt.keys(): 264 if key.startswith(prefix): 265 yield key 266 267 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 268 """ 269 @private 270 List *immediate* children that live directly under **prefix**. 271 272 This is similar to :py:meth:`list_prefix` but collapses everything 273 below the first ``"/"`` after *prefix*. Each child name is yielded 274 **exactly once** in the order of first appearance while scanning the 275 HAMT keys. 276 277 Parameters 278 ---------- 279 prefix : str 280 Logical directory path. *Must* end with ``"/"`` for the result to 281 make sense (e.g. ``"a/b/"``). 282 283 Yields 284 ------ 285 str 286 The name of each direct child (file or sub-directory) of *prefix*. 287 288 Examples 289 -------- 290 With keys :: 291 292 a/b/c/d 293 a/b/c/e 294 a/b/f 295 a/b/g/h/i 296 297 ``await list_dir("a/b/")`` produces :: 298 299 c 300 f 301 g 302 303 Notes 304 ----- 305 • Internally uses a :class:`set` to deduplicate names; memory grows 306 with the number of *unique* children, not the total number of keys. 307 • Order is **not** sorted; it reflects the first encounter while 308 iterating over :py:meth:`HAMT.keys`. 309 """ 310 seen_names: set[str] = set() 311 async for key in self.hamt.keys(): 312 if key.startswith(prefix): 313 suffix: str = key[len(prefix) :] 314 first_slash: int = suffix.find("/") 315 if first_slash == -1: 316 if suffix not in seen_names: 317 seen_names.add(suffix) 318 yield suffix 319 else: 320 name: str = suffix[0:first_slash] 321 if name not in seen_names: 322 seen_names.add(name) 323 yield name
Write and read Zarr v3s with a HAMT.
Read or write a Zarr-v3 store whose key/value pairs live inside a py-hamt mapping.
Keys are stored verbatim ("temp/c/0/0/0" → same string in HAMT) and
the value is the raw byte payload produced by Zarr. No additional
framing, compression, or encryption is applied by this class. For a zarr encryption example
see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
For a fully encrypted zarr store, where metadata is not available, please see
SimpleEncryptedZarrHAMTStore but we do not recommend using it.
A note about using the same ZarrHAMTStore for writing and then reading again
If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.
Sample Code
# --- Write ---
ds: xarray.Dataset = # ...
cas: ContentAddressedStore = # ...
hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
hamt = await HAMT.build(cas, values_are_bytes=True) # write-enabled
zhs = ZarrHAMTStore(hamt, read_only=False)
ds.to_zarr(store=zhs, mode="w", zarr_format=3)
await hamt.make_read_only() # flush + freeze
root_node_id = hamt.root_node_id
print(root_node_id)
# --- read ---
hamt_ro = await HAMT.build(
cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
)
zhs_ro = ZarrHAMTStore(hamt_ro, read_only=True)
ds_ro = xarray.open_zarr(store=zhs_ro)
print(ds_ro)
xarray.testing.assert_identical(ds, ds_ro)
58 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 59 """ 60 ### `hamt` and `read_only` 61 You need to make sure the following two things are true: 62 63 1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async. 64 2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations. 65 66 ##### A note about the zarr chunk separator, "/" vs "." 67 While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well. 68 69 #### Metadata Read Cache 70 `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data. 71 """ 72 super().__init__(read_only=read_only) 73 74 assert hamt.read_only == read_only 75 assert hamt.values_are_bytes 76 self.hamt: HAMT = hamt 77 """ 78 The internal HAMT. 79 Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID. 80 """ 81 82 self.metadata_read_cache: dict[str, bytes] = {} 83 """@private"""
hamt and read_only
You need to make sure the following two things are true:
- The HAMT is in the same read only mode that you are passing into the Zarr store. This means that
hamt.read_only == read_only. This is because making a HAMT read only automatically requires async operations, but__init__cannot be async. - The HAMT has
hamt.values_are_bytes == True. This improves efficiency with Zarr v3 operations.
A note about the zarr chunk separator, "/" vs "."
While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
Metadata Read Cache
ZarrHAMTStore has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
The internal HAMT. Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
108 @property 109 def read_only(self) -> bool: # type: ignore[override] 110 if self._forced_read_only is not None: # instance attr overrides 111 return self._forced_read_only 112 return self.hamt.read_only
Is the store read-only?
114 def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore": 115 """ 116 Return this store (if the flag already matches) or a *shallow* 117 clone that presents the requested read‑only status. 118 119 The clone **shares** the same :class:`~py_hamt.hamt.HAMT` 120 instance; no flushing, network traffic or async work is done. 121 """ 122 # Fast path 123 if read_only == self.read_only: 124 return self # Same mode, return same instance 125 126 # Create new instance with different read_only flag 127 # Creates a *bare* instance without running its __init__ 128 clone = type(self).__new__(type(self)) 129 130 # Copy attributes that matter 131 clone.hamt = self.hamt # Share the HAMT 132 clone._forced_read_only = read_only 133 clone.metadata_read_cache = self.metadata_read_cache.copy() 134 135 # Re‑initialise the zarr base class so that Zarr sees the flag 136 zarr.abc.store.Store.__init__(clone, read_only=read_only) 137 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same ~py_hamt.hamt.HAMT
instance; no flushing, network traffic or async work is done.
180 async def get_partial_values( 181 self, 182 prototype: zarr.core.buffer.BufferPrototype, 183 key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], 184 ) -> list[zarr.core.buffer.Buffer | None]: 185 """ 186 Retrieves multiple keys or byte ranges concurrently using asyncio.gather. 187 """ 188 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 189 results = await asyncio.gather( 190 *tasks, return_exceptions=False 191 ) # Set return_exceptions=True for debugging 192 return results
Retrieves multiple keys or byte ranges concurrently using asyncio.gather.
13class SimpleEncryptedZarrHAMTStore(ZarrHAMTStore): 14 """ 15 Write and read Zarr v3s with a HAMT, encrypting *everything* for maximum privacy. 16 17 This store uses ChaCha20-Poly1305 to encrypt every single key-value pair 18 stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.) 19 and data chunks. This provides strong privacy but means the Zarr store is 20 completely opaque and unusable without the correct encryption key and header. 21 22 Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb 23 24 #### Encryption Details 25 - Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce). 26 - Requires a 32-byte encryption key and a header. 27 - Each encrypted value includes a 24-byte nonce and a 16-byte tag. 28 29 #### Important Considerations 30 - Since metadata is encrypted, standard Zarr tools cannot inspect the 31 dataset without prior decryption using this store class. 32 - There is no support for partial encryption or excluding variables. 33 - There is no metadata caching. 34 35 #### Sample Code 36 ```python 37 import xarray 38 from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar 39 from Crypto.Random import get_random_bytes 40 import numpy as np 41 42 # Setup 43 ds = xarray.Dataset( 44 {"data": (("y", "x"), np.arange(12).reshape(3, 4))}, 45 coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]} 46 ) 47 cas = KuboCAS() # Example ContentAddressedStore 48 encryption_key = get_random_bytes(32) 49 header = b"fully-encrypted-zarr" 50 51 # --- Write --- 52 hamt_write = await HAMT.build(cas=cas, values_are_bytes=True) 53 ezhs_write = SimpleEncryptedZarrHAMTStore( 54 hamt_write, False, encryption_key, header 55 ) 56 print("Writing fully encrypted Zarr...") 57 ds.to_zarr(store=ezhs_write, mode="w") 58 await hamt_write.make_read_only() 59 root_node_id = hamt_write.root_node_id 60 print(f"Wrote Zarr with root: {root_node_id}") 61 62 # --- Read --- 63 hamt_read = await HAMT.build( 64 cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True 65 ) 66 ezhs_read = SimpleEncryptedZarrHAMTStore( 67 hamt_read, True, encryption_key, header 68 ) 69 print("\nReading fully encrypted Zarr...") 70 ds_read = xarray.open_zarr(store=ezhs_read) 71 print("Read back dataset:") 72 print(ds_read) 73 xarray.testing.assert_identical(ds, ds_read) 74 print("Read successful and data verified.") 75 76 # --- Read with wrong key (demonstrates failure) --- 77 wrong_key = get_random_bytes(32) 78 hamt_bad = await HAMT.build( 79 cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True 80 ) 81 ezhs_bad = SimpleEncryptedZarrHAMTStore( 82 hamt_bad, True, wrong_key, header 83 ) 84 print("\nAttempting to read with wrong key...") 85 try: 86 ds_bad = xarray.open_zarr(store=ezhs_bad) 87 print(ds_bad) 88 except Exception as e: 89 print(f"Failed to read as expected: {type(e).__name__} - {e}") 90 ``` 91 """ 92 93 def __init__( 94 self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes 95 ) -> None: 96 """ 97 Initializes the SimpleEncryptedZarrHAMTStore. 98 99 Args: 100 hamt: The HAMT instance for storage. Must have `values_are_bytes=True`. 101 Its `read_only` status must match the `read_only` argument. 102 read_only: If True, the store is in read-only mode. 103 encryption_key: A 32-byte key for ChaCha20-Poly1305. 104 header: A header (bytes) used as associated data in encryption. 105 """ 106 super().__init__(hamt, read_only=read_only) 107 108 if len(encryption_key) != 32: 109 raise ValueError("Encryption key must be exactly 32 bytes long.") 110 self.encryption_key = encryption_key 111 self.header = header 112 self.metadata_read_cache: dict[str, bytes] = {} 113 114 def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore": 115 if read_only == self.read_only: 116 return self 117 118 clone = type(self).__new__(type(self)) 119 clone.hamt = self.hamt 120 clone.encryption_key = self.encryption_key 121 clone.header = self.header 122 clone.metadata_read_cache = self.metadata_read_cache 123 clone._forced_read_only = read_only # safe; attribute is declared 124 zarr.abc.store.Store.__init__(clone, read_only=read_only) 125 return clone 126 127 def _encrypt(self, val: bytes) -> bytes: 128 """Encrypts data using ChaCha20-Poly1305.""" 129 nonce = get_random_bytes(24) # XChaCha20 uses a 24-byte nonce 130 cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce) 131 cipher.update(self.header) 132 ciphertext, tag = cipher.encrypt_and_digest(val) 133 return nonce + tag + ciphertext 134 135 def _decrypt(self, val: bytes) -> bytes: 136 """Decrypts data using ChaCha20-Poly1305.""" 137 try: 138 # Extract nonce (24), tag (16), and ciphertext 139 nonce, tag, ciphertext = val[:24], val[24:40], val[40:] 140 cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce) 141 cipher.update(self.header) 142 plaintext = cipher.decrypt_and_verify(ciphertext, tag) 143 return plaintext 144 except Exception as e: 145 # Catching a broad exception as various issues (key, tag, length) can occur. 146 raise ValueError( 147 "Decryption failed. Check key, header, or data integrity." 148 ) from e 149 150 def __eq__(self, other: object) -> bool: 151 """@private""" 152 if not isinstance(other, SimpleEncryptedZarrHAMTStore): 153 return False 154 return ( 155 self.hamt.root_node_id == other.hamt.root_node_id 156 and self.encryption_key == other.encryption_key 157 and self.header == other.header 158 ) 159 160 async def get( 161 self, 162 key: str, 163 prototype: zarr.core.buffer.BufferPrototype, 164 byte_range: zarr.abc.store.ByteRequest | None = None, 165 ) -> zarr.core.buffer.Buffer | None: 166 """@private""" 167 try: 168 decrypted_val: bytes 169 is_metadata: bool = ( 170 len(key) >= 9 and key[-9:] == "zarr.json" 171 ) # if path ends with zarr.json 172 173 if is_metadata and key in self.metadata_read_cache: 174 decrypted_val = self.metadata_read_cache[key] 175 else: 176 raw_val = cast( 177 bytes, await self.hamt.get(key) 178 ) # We know values received will always be bytes since we only store bytes in the HAMT 179 decrypted_val = self._decrypt(raw_val) 180 if is_metadata: 181 self.metadata_read_cache[key] = decrypted_val 182 return prototype.buffer.from_bytes(decrypted_val) 183 except KeyError: 184 return None 185 186 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 187 """@private""" 188 if self.read_only: 189 raise Exception("Cannot write to a read-only store.") 190 191 raw_bytes = value.to_bytes() 192 if key in self.metadata_read_cache: 193 self.metadata_read_cache[key] = raw_bytes 194 # Encrypt it 195 encrypted_bytes = self._encrypt(raw_bytes) 196 await self.hamt.set(key, encrypted_bytes)
Write and read Zarr v3s with a HAMT, encrypting everything for maximum privacy.
This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
and data chunks. This provides strong privacy but means the Zarr store is
completely opaque and unusable without the correct encryption key and header.
Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
#### Encryption Details
- Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
- Requires a 32-byte encryption key and a header.
- Each encrypted value includes a 24-byte nonce and a 16-byte tag.
#### Important Considerations
- Since metadata is encrypted, standard Zarr tools cannot inspect the
dataset without prior decryption using this store class.
- There is no support for partial encryption or excluding variables.
- There is no metadata caching.
#### Sample Code
import xarray
from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
from Crypto.Random import get_random_bytes
import numpy as np
# Setup
ds = xarray.Dataset(
{"data": (("y", "x"), np.arange(12).reshape(3, 4))},
coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
)
cas = KuboCAS() # Example ContentAddressedStore
encryption_key = get_random_bytes(32)
header = b"fully-encrypted-zarr"
# --- Write ---
hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
ezhs_write = SimpleEncryptedZarrHAMTStore(
hamt_write, False, encryption_key, header
)
print("Writing fully encrypted Zarr...")
ds.to_zarr(store=ezhs_write, mode="w")
await hamt_write.make_read_only()
root_node_id = hamt_write.root_node_id
print(f"Wrote Zarr with root: {root_node_id}")
# --- Read ---
hamt_read = await HAMT.build(
cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
)
ezhs_read = SimpleEncryptedZarrHAMTStore(
hamt_read, True, encryption_key, header
)
print("
Reading fully encrypted Zarr...")
ds_read = xarray.open_zarr(store=ezhs_read)
print("Read back dataset:")
print(ds_read)
xarray.testing.assert_identical(ds, ds_read)
print("Read successful and data verified.")
# --- Read with wrong key (demonstrates failure) ---
wrong_key = get_random_bytes(32)
hamt_bad = await HAMT.build(
cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
)
ezhs_bad = SimpleEncryptedZarrHAMTStore(
hamt_bad, True, wrong_key, header
)
print("
Attempting to read with wrong key...")
try:
ds_bad = xarray.open_zarr(store=ezhs_bad)
print(ds_bad)
except Exception as e:
print(f"Failed to read as expected: {type(e).__name__} - {e}")
93 def __init__( 94 self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes 95 ) -> None: 96 """ 97 Initializes the SimpleEncryptedZarrHAMTStore. 98 99 Args: 100 hamt: The HAMT instance for storage. Must have `values_are_bytes=True`. 101 Its `read_only` status must match the `read_only` argument. 102 read_only: If True, the store is in read-only mode. 103 encryption_key: A 32-byte key for ChaCha20-Poly1305. 104 header: A header (bytes) used as associated data in encryption. 105 """ 106 super().__init__(hamt, read_only=read_only) 107 108 if len(encryption_key) != 32: 109 raise ValueError("Encryption key must be exactly 32 bytes long.") 110 self.encryption_key = encryption_key 111 self.header = header 112 self.metadata_read_cache: dict[str, bytes] = {}
Initializes the SimpleEncryptedZarrHAMTStore.
Args:
hamt: The HAMT instance for storage. Must have values_are_bytes=True.
Its read_only status must match the read_only argument.
read_only: If True, the store is in read-only mode.
encryption_key: A 32-byte key for ChaCha20-Poly1305.
header: A header (bytes) used as associated data in encryption.
114 def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore": 115 if read_only == self.read_only: 116 return self 117 118 clone = type(self).__new__(type(self)) 119 clone.hamt = self.hamt 120 clone.encryption_key = self.encryption_key 121 clone.header = self.header 122 clone.metadata_read_cache = self.metadata_read_cache 123 clone._forced_read_only = read_only # safe; attribute is declared 124 zarr.abc.store.Store.__init__(clone, read_only=read_only) 125 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same ~py_hamt.hamt.HAMT
instance; no flushing, network traffic or async work is done.
143class ShardedZarrStore(zarr.abc.store.Store): 144 """ 145 Implements the Zarr Store API using a sharded layout for chunk CIDs. 146 147 This store divides the flat index of chunk CIDs into multiple "shards". 148 Each shard is a DAG-CBOR array where each element is either a CID link 149 to a chunk or a null value if the chunk is empty. This structure allows 150 for efficient traversal by IPLD-aware systems. 151 152 The store's root object contains: 153 1. A dictionary mapping metadata keys (like 'zarr.json') to their CIDs. 154 2. A list of CIDs, where each CID points to a shard object. 155 3. Sharding configuration details (e.g., chunks_per_shard). 156 """ 157 158 def __init__( 159 self, 160 cas: ContentAddressedStore, 161 read_only: bool, 162 root_cid: Optional[str] = None, 163 *, 164 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 165 ): 166 """Use the async `open()` classmethod to instantiate this class.""" 167 super().__init__(read_only=read_only) 168 self.cas = cas 169 self._root_cid = root_cid 170 self._root_obj: dict 171 172 self._resize_lock = asyncio.Lock() 173 # An event to signal when a resize is in-progress. 174 # It starts in the "set" state, allowing all operations to proceed. 175 self._resize_complete = asyncio.Event() 176 self._resize_complete.set() 177 self._shard_locks: DefaultDict[int, asyncio.Lock] = defaultdict(asyncio.Lock) 178 179 self._shard_data_cache = MemoryBoundedLRUCache(max_cache_memory_bytes) 180 self._pending_shard_loads: Dict[int, asyncio.Event] = {} 181 182 self._array_shape: Tuple[int, ...] 183 self._chunk_shape: Tuple[int, ...] 184 self._chunks_per_dim: Tuple[int, ...] 185 self._chunks_per_shard: int 186 self._num_shards: int = 0 187 self._total_chunks: int = 0 188 189 self._dirty_root = False 190 191 def __update_geometry(self): 192 """Calculates derived geometric properties from the base shapes.""" 193 194 if not all(cs > 0 for cs in self._chunk_shape): 195 raise ValueError("All chunk_shape dimensions must be positive.") 196 if not all(s >= 0 for s in self._array_shape): 197 raise ValueError("All array_shape dimensions must be non-negative.") 198 199 self._chunks_per_dim = tuple( 200 math.ceil(a / c) if c > 0 else 0 201 for a, c in zip(self._array_shape, self._chunk_shape) 202 ) 203 self._total_chunks = math.prod(self._chunks_per_dim) 204 205 if not self._total_chunks == 0: 206 self._num_shards = ( 207 self._total_chunks + self._chunks_per_shard - 1 208 ) // self._chunks_per_shard 209 210 @classmethod 211 async def open( 212 cls, 213 cas: ContentAddressedStore, 214 read_only: bool, 215 root_cid: Optional[str] = None, 216 *, 217 array_shape: Optional[Tuple[int, ...]] = None, 218 chunk_shape: Optional[Tuple[int, ...]] = None, 219 chunks_per_shard: Optional[int] = None, 220 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 221 ) -> "ShardedZarrStore": 222 """ 223 Asynchronously opens an existing ShardedZarrStore or initializes a new one. 224 """ 225 store = cls( 226 cas, read_only, root_cid, max_cache_memory_bytes=max_cache_memory_bytes 227 ) 228 if root_cid: 229 await store._load_root_from_cid() 230 elif not read_only: 231 if array_shape is None or chunk_shape is None: 232 raise ValueError( 233 "array_shape and chunk_shape must be provided for a new store." 234 ) 235 236 if not isinstance(chunks_per_shard, int) or chunks_per_shard <= 0: 237 raise ValueError("chunks_per_shard must be a positive integer.") 238 239 store._initialize_new_root(array_shape, chunk_shape, chunks_per_shard) 240 else: 241 raise ValueError("root_cid must be provided for a read-only store.") 242 return store 243 244 def _initialize_new_root( 245 self, 246 array_shape: Tuple[int, ...], 247 chunk_shape: Tuple[int, ...], 248 chunks_per_shard: int, 249 ): 250 self._array_shape = array_shape 251 self._chunk_shape = chunk_shape 252 self._chunks_per_shard = chunks_per_shard 253 254 self.__update_geometry() 255 256 self._root_obj = { 257 "manifest_version": "sharded_zarr_v1", 258 "metadata": {}, 259 "chunks": { 260 "array_shape": list(self._array_shape), 261 "chunk_shape": list(self._chunk_shape), 262 "sharding_config": { 263 "chunks_per_shard": self._chunks_per_shard, 264 }, 265 "shard_cids": [None] * self._num_shards, 266 }, 267 } 268 self._dirty_root = True 269 270 async def _load_root_from_cid(self): 271 root_bytes = await self.cas.load(self._root_cid) 272 try: 273 self._root_obj = dag_cbor.decode(root_bytes) 274 if not isinstance(self._root_obj, dict) or "chunks" not in self._root_obj: 275 raise ValueError( 276 "Root object is not a valid dictionary with 'chunks' key." 277 ) 278 if not isinstance(self._root_obj["chunks"]["shard_cids"], list): 279 raise ValueError("shard_cids is not a list.") 280 except Exception as e: 281 raise ValueError(f"Failed to decode root object: {e}") 282 283 if self._root_obj.get("manifest_version") != "sharded_zarr_v1": 284 raise ValueError( 285 f"Incompatible manifest version: {self._root_obj.get('manifest_version')}. Expected 'sharded_zarr_v1'." 286 ) 287 288 chunk_info = self._root_obj["chunks"] 289 self._array_shape = tuple(chunk_info["array_shape"]) 290 self._chunk_shape = tuple(chunk_info["chunk_shape"]) 291 self._chunks_per_shard = chunk_info["sharding_config"]["chunks_per_shard"] 292 293 self.__update_geometry() 294 295 if len(chunk_info["shard_cids"]) != self._num_shards: 296 raise ValueError( 297 f"Inconsistent number of shards. Expected {self._num_shards}, found {len(chunk_info['shard_cids'])}." 298 ) 299 300 async def _fetch_and_cache_full_shard( 301 self, 302 shard_idx: int, 303 shard_cid: str, 304 max_retries: int = 3, 305 retry_delay: float = 1.0, 306 ) -> None: 307 """ 308 Fetch a shard from CAS and cache it, with retry logic for transient errors. 309 310 Args: 311 shard_idx: The index of the shard to fetch. 312 shard_cid: The CID of the shard. 313 max_retries: Maximum number of retry attempts for transient errors. 314 retry_delay: Delay between retry attempts in seconds. 315 """ 316 for attempt in range(max_retries): 317 try: 318 shard_data_bytes = await self.cas.load(shard_cid) 319 decoded_shard = dag_cbor.decode(shard_data_bytes) 320 if not isinstance(decoded_shard, list): 321 raise TypeError(f"Shard {shard_idx} did not decode to a list.") 322 await self._shard_data_cache.put(shard_idx, decoded_shard) 323 # Always set the Event to unblock waiting coroutines 324 if shard_idx in self._pending_shard_loads: 325 self._pending_shard_loads[shard_idx].set() 326 del self._pending_shard_loads[shard_idx] 327 return # Success 328 except (ConnectionError, TimeoutError) as e: 329 # Handle transient errors (e.g., network issues) 330 if attempt < max_retries - 1: 331 await asyncio.sleep( 332 retry_delay * (2**attempt) 333 ) # Exponential backoff 334 continue 335 else: 336 raise RuntimeError( 337 f"Failed to fetch shard {shard_idx} after {max_retries} attempts: {e}" 338 ) 339 340 def _parse_chunk_key(self, key: str) -> Optional[Tuple[int, ...]]: 341 # 1. Exclude .json files immediately (metadata) 342 if key.endswith(".json"): 343 return None 344 excluded_array_prefixes = { 345 "time", 346 "lat", 347 "lon", 348 "latitude", 349 "longitude", 350 "forecast_reference_time", 351 "step", 352 } 353 354 chunk_marker = "/c/" 355 marker_idx = key.rfind(chunk_marker) # Use rfind for robustness 356 if marker_idx == -1: 357 # Key does not contain "/c/", so it's not a chunk data key 358 # in the expected format (e.g., could be .zattrs, .zgroup at various levels). 359 return None 360 361 # Extract the part of the key before "/c/", which might represent the array/group path 362 # e.g., "temp" from "temp/c/0/0/0" 363 # e.g., "group1/lat" from "group1/lat/c/0" 364 # e.g., "" if key is "c/0/0/0" (root array) 365 path_before_c = key[:marker_idx] 366 367 # Determine the actual array name (the last component of the path before "/c/") 368 actual_array_name = "" 369 if path_before_c: 370 actual_array_name = path_before_c.split("/")[-1] 371 372 # If the determined array name is in our exclusion list, return None. 373 if actual_array_name in excluded_array_prefixes: 374 return None 375 376 # The part after "/c/" contains the chunk coordinates 377 coord_part = key[marker_idx + len(chunk_marker) :] 378 parts = coord_part.split("/") 379 380 coords = tuple(map(int, parts)) 381 # Validate coordinates against the chunk grid of the store's configured array 382 for i, c_coord in enumerate(coords): 383 if not (0 <= c_coord < self._chunks_per_dim[i]): 384 raise IndexError( 385 f"Chunk coordinate {c_coord} at dimension {i} is out of bounds for dimension size {self._chunks_per_dim[i]}." 386 ) 387 return coords 388 389 def _get_linear_chunk_index(self, chunk_coords: Tuple[int, ...]) -> int: 390 linear_index = 0 391 multiplier = 1 392 # Convert N-D chunk coordinates to a flat 1-D index (row-major order) 393 for i in reversed(range(len(self._chunks_per_dim))): 394 linear_index += chunk_coords[i] * multiplier 395 multiplier *= self._chunks_per_dim[i] 396 return linear_index 397 398 def _get_shard_info(self, linear_chunk_index: int) -> Tuple[int, int]: 399 shard_idx = linear_chunk_index // self._chunks_per_shard 400 index_in_shard = linear_chunk_index % self._chunks_per_shard 401 return shard_idx, index_in_shard 402 403 async def _load_or_initialize_shard_cache( 404 self, shard_idx: int 405 ) -> List[Optional[CID]]: 406 """ 407 Load a shard into the cache or initialize an empty shard if it doesn't exist. 408 409 Args: 410 shard_idx: The index of the shard to load or initialize. 411 412 Returns: 413 List[Optional[CID]]: The shard data (list of CIDs or None). 414 415 Raises: 416 ValueError: If the shard index is out of bounds. 417 RuntimeError: If the shard cannot be loaded or initialized. 418 """ 419 cached_shard = await self._shard_data_cache.get(shard_idx) 420 if cached_shard is not None: 421 return cached_shard 422 423 if shard_idx in self._pending_shard_loads: 424 try: 425 # Wait for the pending load with a timeout (e.g., 60 seconds) 426 await asyncio.wait_for( 427 self._pending_shard_loads[shard_idx].wait(), timeout=60.0 428 ) 429 cached_shard = await self._shard_data_cache.get(shard_idx) 430 if cached_shard is not None: 431 return cached_shard 432 else: 433 raise RuntimeError( 434 f"Shard {shard_idx} not found in cache after pending load completed." 435 ) 436 except asyncio.TimeoutError: 437 # Clean up the pending load to allow retry 438 if shard_idx in self._pending_shard_loads: 439 self._pending_shard_loads[shard_idx].set() 440 del self._pending_shard_loads[shard_idx] 441 raise RuntimeError(f"Timeout waiting for shard {shard_idx} to load.") 442 443 if not (0 <= shard_idx < self._num_shards): 444 raise ValueError(f"Shard index {shard_idx} out of bounds.") 445 446 shard_cid_obj = self._root_obj["chunks"]["shard_cids"][shard_idx] 447 if shard_cid_obj: 448 self._pending_shard_loads[shard_idx] = asyncio.Event() 449 shard_cid_str = str(shard_cid_obj) 450 await self._fetch_and_cache_full_shard(shard_idx, shard_cid_str) 451 else: 452 empty_shard = [None] * self._chunks_per_shard 453 await self._shard_data_cache.put(shard_idx, empty_shard) 454 455 result = await self._shard_data_cache.get(shard_idx) 456 if result is None: 457 raise RuntimeError(f"Failed to load or initialize shard {shard_idx}") 458 return result # type: ignore[return-value] 459 460 async def set_partial_values( 461 self, key_start_values: Iterable[Tuple[str, int, BytesLike]] 462 ) -> None: 463 raise NotImplementedError( 464 "Partial writes are not supported by ShardedZarrStore." 465 ) 466 467 async def get_partial_values( 468 self, 469 prototype: zarr.core.buffer.BufferPrototype, 470 key_ranges: Iterable[Tuple[str, zarr.abc.store.ByteRequest | None]], 471 ) -> List[Optional[zarr.core.buffer.Buffer]]: 472 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 473 results = await asyncio.gather(*tasks) 474 return results 475 476 def with_read_only(self, read_only: bool = False) -> "ShardedZarrStore": 477 """ 478 Return this store (if the flag already matches) or a *shallow* 479 clone that presents the requested read‑only status. 480 481 The clone **shares** the same CAS instance and internal state; 482 no flushing, network traffic or async work is done. 483 """ 484 # Fast path 485 if read_only == self.read_only: 486 return self # Same mode, return same instance 487 488 # Create new instance with different read_only flag 489 # Creates a *bare* instance without running its __init__ 490 clone = type(self).__new__(type(self)) 491 492 # Copy all attributes from the current instance 493 clone.cas = self.cas 494 clone._root_cid = self._root_cid 495 clone._root_obj = self._root_obj 496 497 clone._resize_lock = self._resize_lock 498 clone._resize_complete = self._resize_complete 499 clone._shard_locks = self._shard_locks 500 501 clone._shard_data_cache = self._shard_data_cache 502 clone._pending_shard_loads = self._pending_shard_loads 503 504 clone._array_shape = self._array_shape 505 clone._chunk_shape = self._chunk_shape 506 clone._chunks_per_dim = self._chunks_per_dim 507 clone._chunks_per_shard = self._chunks_per_shard 508 clone._num_shards = self._num_shards 509 clone._total_chunks = self._total_chunks 510 511 clone._dirty_root = self._dirty_root 512 513 # Re‑initialise the zarr base class so that Zarr sees the flag 514 zarr.abc.store.Store.__init__(clone, read_only=read_only) 515 return clone 516 517 def __eq__(self, other: object) -> bool: 518 if not isinstance(other, ShardedZarrStore): 519 return False 520 # For equality, root CID is primary. Config like chunks_per_shard is part of that root's identity. 521 return self._root_cid == other._root_cid 522 523 # If nothing to flush, return the root CID. 524 async def flush(self) -> str: 525 async with self._shard_data_cache._cache_lock: 526 dirty_shards = list(self._shard_data_cache._dirty_shards) 527 if dirty_shards: 528 for shard_idx in sorted(dirty_shards): 529 # Get the list of CIDs/Nones from the cache 530 shard_data_list = await self._shard_data_cache.get(shard_idx) 531 if shard_data_list is None: 532 raise RuntimeError(f"Dirty shard {shard_idx} not found in cache") 533 534 # Encode this list into a DAG-CBOR byte representation 535 shard_data_bytes = dag_cbor.encode(shard_data_list) 536 537 # Save the DAG-CBOR block and get its CID 538 new_shard_cid_obj = await self.cas.save( 539 shard_data_bytes, 540 codec="dag-cbor", # Use 'dag-cbor' codec 541 ) 542 543 if ( 544 self._root_obj["chunks"]["shard_cids"][shard_idx] 545 != new_shard_cid_obj 546 ): 547 # Store the CID object directly 548 self._root_obj["chunks"]["shard_cids"][shard_idx] = ( 549 new_shard_cid_obj 550 ) 551 self._dirty_root = True 552 # Mark shard as clean after flushing 553 await self._shard_data_cache.mark_clean(shard_idx) 554 555 if self._dirty_root: 556 # Ensure all metadata CIDs are CID objects for correct encoding 557 self._root_obj["metadata"] = { 558 k: (CID.decode(v) if isinstance(v, str) else v) 559 for k, v in self._root_obj["metadata"].items() 560 } 561 root_obj_bytes = dag_cbor.encode(self._root_obj) 562 new_root_cid = await self.cas.save(root_obj_bytes, codec="dag-cbor") 563 self._root_cid = str(new_root_cid) 564 self._dirty_root = False 565 566 # Ignore because root_cid will always exist after initialization or flush. 567 return self._root_cid # type: ignore[return-value] 568 569 async def get( 570 self, 571 key: str, 572 prototype: zarr.core.buffer.BufferPrototype, 573 byte_range: Optional[zarr.abc.store.ByteRequest] = None, 574 ) -> Optional[zarr.core.buffer.Buffer]: 575 chunk_coords = self._parse_chunk_key(key) 576 # Metadata request 577 if chunk_coords is None: 578 metadata_cid_obj = self._root_obj["metadata"].get(key) 579 if metadata_cid_obj is None: 580 return None 581 if byte_range is not None: 582 raise ValueError( 583 "Byte range requests are not supported for metadata keys." 584 ) 585 data = await self.cas.load(str(metadata_cid_obj)) 586 return prototype.buffer.from_bytes(data) 587 # Chunk data request 588 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 589 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 590 591 # This will load the full shard into cache if it's not already there. 592 shard_lock = self._shard_locks[shard_idx] 593 async with shard_lock: 594 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 595 596 # Get the CID object (or None) from the cached list. 597 chunk_cid_obj = target_shard_list[index_in_shard] 598 599 if chunk_cid_obj is None: 600 return None # Chunk is empty/doesn't exist. 601 602 chunk_cid_str = str(chunk_cid_obj) 603 604 req_offset = None 605 req_length = None 606 req_suffix = None 607 608 if byte_range: 609 if isinstance(byte_range, RangeByteRequest): 610 req_offset = byte_range.start 611 if byte_range.end is not None: 612 if byte_range.start > byte_range.end: 613 raise ValueError( 614 f"Byte range start ({byte_range.start}) cannot be greater than end ({byte_range.end})" 615 ) 616 req_length = byte_range.end - byte_range.start 617 elif isinstance(byte_range, OffsetByteRequest): 618 req_offset = byte_range.offset 619 elif isinstance(byte_range, SuffixByteRequest): 620 req_suffix = byte_range.suffix 621 data = await self.cas.load( 622 chunk_cid_str, offset=req_offset, length=req_length, suffix=req_suffix 623 ) 624 return prototype.buffer.from_bytes(data) 625 626 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 627 if self.read_only: 628 raise PermissionError("Cannot write to a read-only store.") 629 await self._resize_complete.wait() 630 631 if key.endswith("zarr.json") and not key == "zarr.json": 632 metadata_json = json.loads(value.to_bytes().decode("utf-8")) 633 new_array_shape = metadata_json.get("shape") 634 # Some metadata entries (e.g., group metadata) do not have a shape field. 635 if new_array_shape: 636 # Only resize when the metadata shape represents the primary array. 637 if ( 638 len(new_array_shape) == len(self._array_shape) 639 and tuple(new_array_shape) != self._array_shape 640 ): 641 async with self._resize_lock: 642 # Double-check after acquiring the lock, in case another task 643 # just finished this exact resize while we were waiting. 644 if ( 645 len(new_array_shape) == len(self._array_shape) 646 and tuple(new_array_shape) != self._array_shape 647 ): 648 # Block all other tasks until resize is complete. 649 self._resize_complete.clear() 650 try: 651 await self.resize_store( 652 new_shape=tuple(new_array_shape) 653 ) 654 finally: 655 # All waiting tasks will now un-pause and proceed safely. 656 self._resize_complete.set() 657 658 raw_data_bytes = value.to_bytes() 659 # Save the data to CAS first to get its CID. 660 # Metadata is often saved as 'raw', chunks as well unless compressed. 661 try: 662 data_cid_obj = await self.cas.save(raw_data_bytes, codec="raw") 663 await self.set_pointer(key, str(data_cid_obj)) 664 except Exception as e: 665 raise RuntimeError(f"Failed to save data for key {key}: {e}") 666 return None # type: ignore[return-value] 667 668 async def set_pointer(self, key: str, pointer: str) -> None: 669 chunk_coords = self._parse_chunk_key(key) 670 671 pointer_cid_obj = CID.decode(pointer) # Convert string to CID object 672 673 if chunk_coords is None: # Metadata key 674 self._root_obj["metadata"][key] = pointer_cid_obj 675 self._dirty_root = True 676 return None 677 678 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 679 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 680 681 shard_lock = self._shard_locks[shard_idx] 682 async with shard_lock: 683 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 684 685 if target_shard_list[index_in_shard] != pointer_cid_obj: 686 target_shard_list[index_in_shard] = pointer_cid_obj 687 await self._shard_data_cache.mark_dirty(shard_idx) 688 return None 689 690 async def exists(self, key: str) -> bool: 691 try: 692 chunk_coords = self._parse_chunk_key(key) 693 if chunk_coords is None: # Metadata 694 return key in self._root_obj.get("metadata", {}) 695 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 696 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 697 # Load shard if not cached and check the index 698 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 699 return target_shard_list[index_in_shard] is not None 700 except (ValueError, IndexError, KeyError): 701 return False 702 703 @property 704 def supports_writes(self) -> bool: 705 return not self.read_only 706 707 @property 708 def supports_partial_writes(self) -> bool: 709 return False # Each chunk CID is written atomically into a shard slot 710 711 @property 712 def supports_deletes(self) -> bool: 713 return not self.read_only 714 715 async def delete(self, key: str) -> None: 716 if self.read_only: 717 raise PermissionError("Cannot delete from a read-only store.") 718 719 chunk_coords = self._parse_chunk_key(key) 720 if chunk_coords is None: # Metadata 721 # Coordinate/metadata deletions should be idempotent for caller convenience. 722 if self._root_obj["metadata"].pop(key, None) is not None: 723 self._dirty_root = True 724 return None 725 726 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 727 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 728 729 shard_lock = self._shard_locks[shard_idx] 730 async with shard_lock: 731 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 732 if target_shard_list[index_in_shard] is not None: 733 target_shard_list[index_in_shard] = None 734 await self._shard_data_cache.mark_dirty(shard_idx) 735 736 @property 737 def supports_listing(self) -> bool: 738 return True 739 740 async def list(self) -> AsyncIterator[str]: 741 for key in list(self._root_obj.get("metadata", {})): 742 yield key 743 744 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 745 async for key in self.list(): 746 if key.startswith(prefix): 747 yield key 748 749 async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]): 750 if self.read_only: 751 raise PermissionError("Cannot graft onto a read-only store.") 752 753 store_to_graft = await ShardedZarrStore.open( 754 cas=self.cas, read_only=True, root_cid=store_to_graft_cid 755 ) 756 source_chunk_grid = store_to_graft._chunks_per_dim 757 for local_coords in itertools.product(*[range(s) for s in source_chunk_grid]): 758 linear_local_index = store_to_graft._get_linear_chunk_index(local_coords) 759 local_shard_idx, index_in_local_shard = store_to_graft._get_shard_info( 760 linear_local_index 761 ) 762 # Load the source shard into its cache 763 source_shard_list = await store_to_graft._load_or_initialize_shard_cache( 764 local_shard_idx 765 ) 766 767 pointer_cid_obj = source_shard_list[index_in_local_shard] 768 if pointer_cid_obj is None: 769 continue 770 771 # Calculate global coordinates and write to the main store's index 772 global_coords = tuple( 773 c_local + c_offset 774 for c_local, c_offset in zip(local_coords, chunk_offset) 775 ) 776 linear_global_index = self._get_linear_chunk_index(global_coords) 777 global_shard_idx, index_in_global_shard = self._get_shard_info( 778 linear_global_index 779 ) 780 781 shard_lock = self._shard_locks[global_shard_idx] 782 async with shard_lock: 783 target_shard_list = await self._load_or_initialize_shard_cache( 784 global_shard_idx 785 ) 786 if target_shard_list[index_in_global_shard] != pointer_cid_obj: 787 target_shard_list[index_in_global_shard] = pointer_cid_obj 788 await self._shard_data_cache.mark_dirty(global_shard_idx) 789 790 async def resize_store(self, new_shape: Tuple[int, ...]): 791 """ 792 Resizes the store's main shard index to accommodate a new overall array shape. 793 This is a metadata-only operation on the store's root object. 794 Used when doing skeleton writes or appends via xarray where the array shape changes. 795 """ 796 if self.read_only: 797 raise PermissionError("Cannot resize a read-only store.") 798 if ( 799 # self._root_obj is None 800 self._chunk_shape is None 801 or self._chunks_per_shard is None 802 or self._array_shape is None 803 ): 804 raise RuntimeError("Store is not properly initialized for resizing.") 805 if len(new_shape) != len(self._array_shape): 806 raise ValueError( 807 "New shape must have the same number of dimensions as the old shape." 808 ) 809 810 self._array_shape = tuple(new_shape) 811 self._chunks_per_dim = tuple( 812 math.ceil(a / c) if c > 0 else 0 813 for a, c in zip(self._array_shape, self._chunk_shape) 814 ) 815 self._total_chunks = math.prod(self._chunks_per_dim) 816 old_num_shards = self._num_shards if self._num_shards is not None else 0 817 self._num_shards = ( 818 (self._total_chunks + self._chunks_per_shard - 1) // self._chunks_per_shard 819 if self._total_chunks > 0 820 else 0 821 ) 822 self._root_obj["chunks"]["array_shape"] = list(self._array_shape) 823 if self._num_shards > old_num_shards: 824 self._root_obj["chunks"]["shard_cids"].extend( 825 [None] * (self._num_shards - old_num_shards) 826 ) 827 elif self._num_shards < old_num_shards: 828 self._root_obj["chunks"]["shard_cids"] = self._root_obj["chunks"][ 829 "shard_cids" 830 ][: self._num_shards] 831 832 self._dirty_root = True 833 834 async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]): 835 """ 836 Resizes the Zarr metadata for a specific variable (e.g., '.json' file). 837 This does NOT change the store's main shard index. 838 """ 839 if self.read_only: 840 raise PermissionError("Cannot resize a read-only store.") 841 842 zarr_metadata_key = f"{variable_name}/zarr.json" 843 844 old_zarr_metadata_cid = self._root_obj["metadata"].get(zarr_metadata_key) 845 if not old_zarr_metadata_cid: 846 raise KeyError( 847 f"Cannot find metadata for key '{zarr_metadata_key}' to resize." 848 ) 849 850 old_zarr_metadata_bytes = await self.cas.load(old_zarr_metadata_cid) 851 zarr_metadata_json = json.loads(old_zarr_metadata_bytes) 852 853 zarr_metadata_json["shape"] = list(new_shape) 854 855 new_zarr_metadata_bytes = json.dumps(zarr_metadata_json, indent=2).encode( 856 "utf-8" 857 ) 858 # Metadata is a raw blob of bytes 859 new_zarr_metadata_cid = await self.cas.save( 860 new_zarr_metadata_bytes, codec="raw" 861 ) 862 863 self._root_obj["metadata"][zarr_metadata_key] = new_zarr_metadata_cid 864 self._dirty_root = True 865 866 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 867 seen: Set[str] = set() 868 if prefix == "": 869 async for key in self.list(): # Iterates metadata keys 870 # e.g., if key is "group1/.zgroup" or "array1/.json", first_component is "group1" or "array1" 871 # if key is ".zgroup", first_component is ".zgroup" 872 first_component = key.split("/", 1)[0] 873 if first_component not in seen: 874 seen.add(first_component) 875 yield first_component 876 else: 877 raise NotImplementedError("Listing with a prefix is not implemented yet.")
Implements the Zarr Store API using a sharded layout for chunk CIDs.
This store divides the flat index of chunk CIDs into multiple "shards". Each shard is a DAG-CBOR array where each element is either a CID link to a chunk or a null value if the chunk is empty. This structure allows for efficient traversal by IPLD-aware systems.
The store's root object contains:
- A dictionary mapping metadata keys (like 'zarr.json') to their CIDs.
- A list of CIDs, where each CID points to a shard object.
- Sharding configuration details (e.g., chunks_per_shard).
158 def __init__( 159 self, 160 cas: ContentAddressedStore, 161 read_only: bool, 162 root_cid: Optional[str] = None, 163 *, 164 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 165 ): 166 """Use the async `open()` classmethod to instantiate this class.""" 167 super().__init__(read_only=read_only) 168 self.cas = cas 169 self._root_cid = root_cid 170 self._root_obj: dict 171 172 self._resize_lock = asyncio.Lock() 173 # An event to signal when a resize is in-progress. 174 # It starts in the "set" state, allowing all operations to proceed. 175 self._resize_complete = asyncio.Event() 176 self._resize_complete.set() 177 self._shard_locks: DefaultDict[int, asyncio.Lock] = defaultdict(asyncio.Lock) 178 179 self._shard_data_cache = MemoryBoundedLRUCache(max_cache_memory_bytes) 180 self._pending_shard_loads: Dict[int, asyncio.Event] = {} 181 182 self._array_shape: Tuple[int, ...] 183 self._chunk_shape: Tuple[int, ...] 184 self._chunks_per_dim: Tuple[int, ...] 185 self._chunks_per_shard: int 186 self._num_shards: int = 0 187 self._total_chunks: int = 0 188 189 self._dirty_root = False
Use the async open() classmethod to instantiate this class.
210 @classmethod 211 async def open( 212 cls, 213 cas: ContentAddressedStore, 214 read_only: bool, 215 root_cid: Optional[str] = None, 216 *, 217 array_shape: Optional[Tuple[int, ...]] = None, 218 chunk_shape: Optional[Tuple[int, ...]] = None, 219 chunks_per_shard: Optional[int] = None, 220 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 221 ) -> "ShardedZarrStore": 222 """ 223 Asynchronously opens an existing ShardedZarrStore or initializes a new one. 224 """ 225 store = cls( 226 cas, read_only, root_cid, max_cache_memory_bytes=max_cache_memory_bytes 227 ) 228 if root_cid: 229 await store._load_root_from_cid() 230 elif not read_only: 231 if array_shape is None or chunk_shape is None: 232 raise ValueError( 233 "array_shape and chunk_shape must be provided for a new store." 234 ) 235 236 if not isinstance(chunks_per_shard, int) or chunks_per_shard <= 0: 237 raise ValueError("chunks_per_shard must be a positive integer.") 238 239 store._initialize_new_root(array_shape, chunk_shape, chunks_per_shard) 240 else: 241 raise ValueError("root_cid must be provided for a read-only store.") 242 return store
Asynchronously opens an existing ShardedZarrStore or initializes a new one.
460 async def set_partial_values( 461 self, key_start_values: Iterable[Tuple[str, int, BytesLike]] 462 ) -> None: 463 raise NotImplementedError( 464 "Partial writes are not supported by ShardedZarrStore." 465 )
Store values at a given key, starting at byte range_start.
Parameters
key_start_values : list[tuple[str, int, BytesLike]] set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key
467 async def get_partial_values( 468 self, 469 prototype: zarr.core.buffer.BufferPrototype, 470 key_ranges: Iterable[Tuple[str, zarr.abc.store.ByteRequest | None]], 471 ) -> List[Optional[zarr.core.buffer.Buffer]]: 472 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 473 results = await asyncio.gather(*tasks) 474 return results
Retrieve possibly partial values from given key_ranges.
Parameters
prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]] Ordered set of key, range pairs, a key may occur multiple times with different ranges
Returns
list of values, in the order of the key_ranges, may contain null/none for missing keys
476 def with_read_only(self, read_only: bool = False) -> "ShardedZarrStore": 477 """ 478 Return this store (if the flag already matches) or a *shallow* 479 clone that presents the requested read‑only status. 480 481 The clone **shares** the same CAS instance and internal state; 482 no flushing, network traffic or async work is done. 483 """ 484 # Fast path 485 if read_only == self.read_only: 486 return self # Same mode, return same instance 487 488 # Create new instance with different read_only flag 489 # Creates a *bare* instance without running its __init__ 490 clone = type(self).__new__(type(self)) 491 492 # Copy all attributes from the current instance 493 clone.cas = self.cas 494 clone._root_cid = self._root_cid 495 clone._root_obj = self._root_obj 496 497 clone._resize_lock = self._resize_lock 498 clone._resize_complete = self._resize_complete 499 clone._shard_locks = self._shard_locks 500 501 clone._shard_data_cache = self._shard_data_cache 502 clone._pending_shard_loads = self._pending_shard_loads 503 504 clone._array_shape = self._array_shape 505 clone._chunk_shape = self._chunk_shape 506 clone._chunks_per_dim = self._chunks_per_dim 507 clone._chunks_per_shard = self._chunks_per_shard 508 clone._num_shards = self._num_shards 509 clone._total_chunks = self._total_chunks 510 511 clone._dirty_root = self._dirty_root 512 513 # Re‑initialise the zarr base class so that Zarr sees the flag 514 zarr.abc.store.Store.__init__(clone, read_only=read_only) 515 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same CAS instance and internal state; no flushing, network traffic or async work is done.
524 async def flush(self) -> str: 525 async with self._shard_data_cache._cache_lock: 526 dirty_shards = list(self._shard_data_cache._dirty_shards) 527 if dirty_shards: 528 for shard_idx in sorted(dirty_shards): 529 # Get the list of CIDs/Nones from the cache 530 shard_data_list = await self._shard_data_cache.get(shard_idx) 531 if shard_data_list is None: 532 raise RuntimeError(f"Dirty shard {shard_idx} not found in cache") 533 534 # Encode this list into a DAG-CBOR byte representation 535 shard_data_bytes = dag_cbor.encode(shard_data_list) 536 537 # Save the DAG-CBOR block and get its CID 538 new_shard_cid_obj = await self.cas.save( 539 shard_data_bytes, 540 codec="dag-cbor", # Use 'dag-cbor' codec 541 ) 542 543 if ( 544 self._root_obj["chunks"]["shard_cids"][shard_idx] 545 != new_shard_cid_obj 546 ): 547 # Store the CID object directly 548 self._root_obj["chunks"]["shard_cids"][shard_idx] = ( 549 new_shard_cid_obj 550 ) 551 self._dirty_root = True 552 # Mark shard as clean after flushing 553 await self._shard_data_cache.mark_clean(shard_idx) 554 555 if self._dirty_root: 556 # Ensure all metadata CIDs are CID objects for correct encoding 557 self._root_obj["metadata"] = { 558 k: (CID.decode(v) if isinstance(v, str) else v) 559 for k, v in self._root_obj["metadata"].items() 560 } 561 root_obj_bytes = dag_cbor.encode(self._root_obj) 562 new_root_cid = await self.cas.save(root_obj_bytes, codec="dag-cbor") 563 self._root_cid = str(new_root_cid) 564 self._dirty_root = False 565 566 # Ignore because root_cid will always exist after initialization or flush. 567 return self._root_cid # type: ignore[return-value]
569 async def get( 570 self, 571 key: str, 572 prototype: zarr.core.buffer.BufferPrototype, 573 byte_range: Optional[zarr.abc.store.ByteRequest] = None, 574 ) -> Optional[zarr.core.buffer.Buffer]: 575 chunk_coords = self._parse_chunk_key(key) 576 # Metadata request 577 if chunk_coords is None: 578 metadata_cid_obj = self._root_obj["metadata"].get(key) 579 if metadata_cid_obj is None: 580 return None 581 if byte_range is not None: 582 raise ValueError( 583 "Byte range requests are not supported for metadata keys." 584 ) 585 data = await self.cas.load(str(metadata_cid_obj)) 586 return prototype.buffer.from_bytes(data) 587 # Chunk data request 588 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 589 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 590 591 # This will load the full shard into cache if it's not already there. 592 shard_lock = self._shard_locks[shard_idx] 593 async with shard_lock: 594 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 595 596 # Get the CID object (or None) from the cached list. 597 chunk_cid_obj = target_shard_list[index_in_shard] 598 599 if chunk_cid_obj is None: 600 return None # Chunk is empty/doesn't exist. 601 602 chunk_cid_str = str(chunk_cid_obj) 603 604 req_offset = None 605 req_length = None 606 req_suffix = None 607 608 if byte_range: 609 if isinstance(byte_range, RangeByteRequest): 610 req_offset = byte_range.start 611 if byte_range.end is not None: 612 if byte_range.start > byte_range.end: 613 raise ValueError( 614 f"Byte range start ({byte_range.start}) cannot be greater than end ({byte_range.end})" 615 ) 616 req_length = byte_range.end - byte_range.start 617 elif isinstance(byte_range, OffsetByteRequest): 618 req_offset = byte_range.offset 619 elif isinstance(byte_range, SuffixByteRequest): 620 req_suffix = byte_range.suffix 621 data = await self.cas.load( 622 chunk_cid_str, offset=req_offset, length=req_length, suffix=req_suffix 623 ) 624 return prototype.buffer.from_bytes(data)
Retrieve the value associated with a given key.
Parameters
key : str prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. byte_range : ByteRequest, optional ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.
Returns
Buffer
626 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 627 if self.read_only: 628 raise PermissionError("Cannot write to a read-only store.") 629 await self._resize_complete.wait() 630 631 if key.endswith("zarr.json") and not key == "zarr.json": 632 metadata_json = json.loads(value.to_bytes().decode("utf-8")) 633 new_array_shape = metadata_json.get("shape") 634 # Some metadata entries (e.g., group metadata) do not have a shape field. 635 if new_array_shape: 636 # Only resize when the metadata shape represents the primary array. 637 if ( 638 len(new_array_shape) == len(self._array_shape) 639 and tuple(new_array_shape) != self._array_shape 640 ): 641 async with self._resize_lock: 642 # Double-check after acquiring the lock, in case another task 643 # just finished this exact resize while we were waiting. 644 if ( 645 len(new_array_shape) == len(self._array_shape) 646 and tuple(new_array_shape) != self._array_shape 647 ): 648 # Block all other tasks until resize is complete. 649 self._resize_complete.clear() 650 try: 651 await self.resize_store( 652 new_shape=tuple(new_array_shape) 653 ) 654 finally: 655 # All waiting tasks will now un-pause and proceed safely. 656 self._resize_complete.set() 657 658 raw_data_bytes = value.to_bytes() 659 # Save the data to CAS first to get its CID. 660 # Metadata is often saved as 'raw', chunks as well unless compressed. 661 try: 662 data_cid_obj = await self.cas.save(raw_data_bytes, codec="raw") 663 await self.set_pointer(key, str(data_cid_obj)) 664 except Exception as e: 665 raise RuntimeError(f"Failed to save data for key {key}: {e}") 666 return None # type: ignore[return-value]
Store a (key, value) pair.
Parameters
key : str value : Buffer
668 async def set_pointer(self, key: str, pointer: str) -> None: 669 chunk_coords = self._parse_chunk_key(key) 670 671 pointer_cid_obj = CID.decode(pointer) # Convert string to CID object 672 673 if chunk_coords is None: # Metadata key 674 self._root_obj["metadata"][key] = pointer_cid_obj 675 self._dirty_root = True 676 return None 677 678 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 679 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 680 681 shard_lock = self._shard_locks[shard_idx] 682 async with shard_lock: 683 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 684 685 if target_shard_list[index_in_shard] != pointer_cid_obj: 686 target_shard_list[index_in_shard] = pointer_cid_obj 687 await self._shard_data_cache.mark_dirty(shard_idx) 688 return None
690 async def exists(self, key: str) -> bool: 691 try: 692 chunk_coords = self._parse_chunk_key(key) 693 if chunk_coords is None: # Metadata 694 return key in self._root_obj.get("metadata", {}) 695 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 696 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 697 # Load shard if not cached and check the index 698 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 699 return target_shard_list[index_in_shard] is not None 700 except (ValueError, IndexError, KeyError): 701 return False
Check if a key exists in the store.
Parameters
key : str
Returns
bool
707 @property 708 def supports_partial_writes(self) -> bool: 709 return False # Each chunk CID is written atomically into a shard slot
Does the store support partial writes?
715 async def delete(self, key: str) -> None: 716 if self.read_only: 717 raise PermissionError("Cannot delete from a read-only store.") 718 719 chunk_coords = self._parse_chunk_key(key) 720 if chunk_coords is None: # Metadata 721 # Coordinate/metadata deletions should be idempotent for caller convenience. 722 if self._root_obj["metadata"].pop(key, None) is not None: 723 self._dirty_root = True 724 return None 725 726 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 727 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 728 729 shard_lock = self._shard_locks[shard_idx] 730 async with shard_lock: 731 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 732 if target_shard_list[index_in_shard] is not None: 733 target_shard_list[index_in_shard] = None 734 await self._shard_data_cache.mark_dirty(shard_idx)
Remove a key from the store
Parameters
key : str
740 async def list(self) -> AsyncIterator[str]: 741 for key in list(self._root_obj.get("metadata", {})): 742 yield key
Retrieve all keys in the store.
Returns
AsyncIterator[str]
744 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 745 async for key in self.list(): 746 if key.startswith(prefix): 747 yield key
Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.
Parameters
prefix : str
Returns
AsyncIterator[str]
749 async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]): 750 if self.read_only: 751 raise PermissionError("Cannot graft onto a read-only store.") 752 753 store_to_graft = await ShardedZarrStore.open( 754 cas=self.cas, read_only=True, root_cid=store_to_graft_cid 755 ) 756 source_chunk_grid = store_to_graft._chunks_per_dim 757 for local_coords in itertools.product(*[range(s) for s in source_chunk_grid]): 758 linear_local_index = store_to_graft._get_linear_chunk_index(local_coords) 759 local_shard_idx, index_in_local_shard = store_to_graft._get_shard_info( 760 linear_local_index 761 ) 762 # Load the source shard into its cache 763 source_shard_list = await store_to_graft._load_or_initialize_shard_cache( 764 local_shard_idx 765 ) 766 767 pointer_cid_obj = source_shard_list[index_in_local_shard] 768 if pointer_cid_obj is None: 769 continue 770 771 # Calculate global coordinates and write to the main store's index 772 global_coords = tuple( 773 c_local + c_offset 774 for c_local, c_offset in zip(local_coords, chunk_offset) 775 ) 776 linear_global_index = self._get_linear_chunk_index(global_coords) 777 global_shard_idx, index_in_global_shard = self._get_shard_info( 778 linear_global_index 779 ) 780 781 shard_lock = self._shard_locks[global_shard_idx] 782 async with shard_lock: 783 target_shard_list = await self._load_or_initialize_shard_cache( 784 global_shard_idx 785 ) 786 if target_shard_list[index_in_global_shard] != pointer_cid_obj: 787 target_shard_list[index_in_global_shard] = pointer_cid_obj 788 await self._shard_data_cache.mark_dirty(global_shard_idx)
790 async def resize_store(self, new_shape: Tuple[int, ...]): 791 """ 792 Resizes the store's main shard index to accommodate a new overall array shape. 793 This is a metadata-only operation on the store's root object. 794 Used when doing skeleton writes or appends via xarray where the array shape changes. 795 """ 796 if self.read_only: 797 raise PermissionError("Cannot resize a read-only store.") 798 if ( 799 # self._root_obj is None 800 self._chunk_shape is None 801 or self._chunks_per_shard is None 802 or self._array_shape is None 803 ): 804 raise RuntimeError("Store is not properly initialized for resizing.") 805 if len(new_shape) != len(self._array_shape): 806 raise ValueError( 807 "New shape must have the same number of dimensions as the old shape." 808 ) 809 810 self._array_shape = tuple(new_shape) 811 self._chunks_per_dim = tuple( 812 math.ceil(a / c) if c > 0 else 0 813 for a, c in zip(self._array_shape, self._chunk_shape) 814 ) 815 self._total_chunks = math.prod(self._chunks_per_dim) 816 old_num_shards = self._num_shards if self._num_shards is not None else 0 817 self._num_shards = ( 818 (self._total_chunks + self._chunks_per_shard - 1) // self._chunks_per_shard 819 if self._total_chunks > 0 820 else 0 821 ) 822 self._root_obj["chunks"]["array_shape"] = list(self._array_shape) 823 if self._num_shards > old_num_shards: 824 self._root_obj["chunks"]["shard_cids"].extend( 825 [None] * (self._num_shards - old_num_shards) 826 ) 827 elif self._num_shards < old_num_shards: 828 self._root_obj["chunks"]["shard_cids"] = self._root_obj["chunks"][ 829 "shard_cids" 830 ][: self._num_shards] 831 832 self._dirty_root = True
Resizes the store's main shard index to accommodate a new overall array shape. This is a metadata-only operation on the store's root object. Used when doing skeleton writes or appends via xarray where the array shape changes.
834 async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]): 835 """ 836 Resizes the Zarr metadata for a specific variable (e.g., '.json' file). 837 This does NOT change the store's main shard index. 838 """ 839 if self.read_only: 840 raise PermissionError("Cannot resize a read-only store.") 841 842 zarr_metadata_key = f"{variable_name}/zarr.json" 843 844 old_zarr_metadata_cid = self._root_obj["metadata"].get(zarr_metadata_key) 845 if not old_zarr_metadata_cid: 846 raise KeyError( 847 f"Cannot find metadata for key '{zarr_metadata_key}' to resize." 848 ) 849 850 old_zarr_metadata_bytes = await self.cas.load(old_zarr_metadata_cid) 851 zarr_metadata_json = json.loads(old_zarr_metadata_bytes) 852 853 zarr_metadata_json["shape"] = list(new_shape) 854 855 new_zarr_metadata_bytes = json.dumps(zarr_metadata_json, indent=2).encode( 856 "utf-8" 857 ) 858 # Metadata is a raw blob of bytes 859 new_zarr_metadata_cid = await self.cas.save( 860 new_zarr_metadata_bytes, codec="raw" 861 ) 862 863 self._root_obj["metadata"][zarr_metadata_key] = new_zarr_metadata_cid 864 self._dirty_root = True
Resizes the Zarr metadata for a specific variable (e.g., '.json' file). This does NOT change the store's main shard index.
866 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 867 seen: Set[str] = set() 868 if prefix == "": 869 async for key in self.list(): # Iterates metadata keys 870 # e.g., if key is "group1/.zgroup" or "array1/.json", first_component is "group1" or "array1" 871 # if key is ".zgroup", first_component is ".zgroup" 872 first_component = key.split("/", 1)[0] 873 if first_component not in seen: 874 seen.add(first_component) 875 yield first_component 876 else: 877 raise NotImplementedError("Listing with a prefix is not implemented yet.")
Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.
Parameters
prefix : str
Returns
AsyncIterator[str]
15async def convert_hamt_to_sharded( 16 cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int 17) -> str: 18 """ 19 Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore. 20 21 Args: 22 cas: An initialized ContentAddressedStore instance (KuboCAS). 23 hamt_root_cid: The root CID of the source ZarrHAMTStore. 24 chunks_per_shard: The number of chunks to group into a single shard in the new store. 25 26 Returns: 27 The root CID of the newly created ShardedZarrStore. 28 """ 29 print(f"--- Starting Conversion from HAMT Root {hamt_root_cid} ---") 30 start_time = time.perf_counter() 31 # 1. Open the source HAMT store for reading 32 print("Opening source HAMT store...") 33 hamt_ro = await HAMT.build( 34 cas=cas, root_node_id=hamt_root_cid, values_are_bytes=True, read_only=True 35 ) 36 source_store = ZarrHAMTStore(hamt_ro, read_only=True) 37 source_dataset = xr.open_zarr(store=source_store, consolidated=True) 38 # 2. Introspect the source array to get its configuration 39 print("Reading metadata from source store...") 40 41 # Read the stores metadata to get array shape and chunk shape 42 data_var_name = next(iter(source_dataset.data_vars)) 43 ordered_dims = list(source_dataset[data_var_name].dims) 44 array_shape_tuple = tuple(source_dataset.sizes[dim] for dim in ordered_dims) 45 chunk_shape_tuple = tuple(source_dataset.chunks[dim][0] for dim in ordered_dims) 46 array_shape = array_shape_tuple 47 chunk_shape = chunk_shape_tuple 48 49 # 3. Create the destination ShardedZarrStore for writing 50 print( 51 f"Initializing new ShardedZarrStore with {chunks_per_shard} chunks per shard..." 52 ) 53 dest_store = await ShardedZarrStore.open( 54 cas=cas, 55 read_only=False, 56 array_shape=array_shape, 57 chunk_shape=chunk_shape, 58 chunks_per_shard=chunks_per_shard, 59 ) 60 61 print("Destination store initialized.") 62 63 # 4. Iterate and copy all data from source to destination 64 print("Starting data migration...") 65 count = 0 66 async for key in hamt_ro.keys(): 67 count += 1 68 # Read the raw data (metadata or chunk) from the source 69 cid: CID = await hamt_ro.get_pointer(key) 70 cid_base32_str = str(cid.encode("base32")) 71 72 # Write the exact same key-value pair to the destination. 73 await dest_store.set_pointer(key, cid_base32_str) 74 if count % 200 == 0: # pragma: no cover 75 print(f"Migrated {count} keys...") # pragma: no cover 76 77 print(f"Migration of {count} total keys complete.") 78 79 # 5. Finalize the new store by flushing it to the CAS 80 print("Flushing new store to get final root CID...") 81 new_root_cid = await dest_store.flush() 82 end_time = time.perf_counter() 83 84 print("\n--- Conversion Complete! ---") 85 print(f"Total time: {end_time - start_time:.2f} seconds") 86 print(f"New ShardedZarrStore Root CID: {new_root_cid}") 87 return new_root_cid
Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore.
Args: cas: An initialized ContentAddressedStore instance (KuboCAS). hamt_root_cid: The root CID of the source ZarrHAMTStore. chunks_per_shard: The number of chunks to group into a single shard in the new store.
Returns: The root CID of the newly created ShardedZarrStore.