py_hamt
1from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore 2from .hamt import HAMT, blake3_hashfn 3from .hamt_to_sharded_converter import convert_hamt_to_sharded, sharded_converter_cli 4from .sharded_zarr_store import ShardedZarrStore 5from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS 6from .zarr_hamt_store import ZarrHAMTStore 7 8__all__ = [ 9 "blake3_hashfn", 10 "HAMT", 11 "ContentAddressedStore", 12 "InMemoryCAS", 13 "KuboCAS", 14 "ZarrHAMTStore", 15 "SimpleEncryptedZarrHAMTStore", 16 "ShardedZarrStore", 17 "convert_hamt_to_sharded", 18 "sharded_converter_cli", 19]
53def blake3_hashfn(input_bytes: bytes) -> bytes: 54 """ 55 This is the default blake3 hash function used for the `HAMT`, with a 32 byte hash size. 56 57 """ 58 # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify 59 digest: bytes = b3.digest(input_bytes, size=32) 60 raw_bytes: bytes = b3.unwrap(digest) 61 return raw_bytes
This is the default blake3 hash function used for the HAMT, with a 32 byte hash size.
289class HAMT: 290 """ 291 An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model. 292 293 Use this to store arbitrarily large key-value mappings in your CAS of choice. 294 295 For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread. 296 297 When in read-only mode, the HAMT is both async and thread safe. 298 299 #### A note about memory management, read+write and read-only modes 300 The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations. 301 302 Note that in read+write, the real root node id IS NOT VALID. You should call `make_read_only()` to convert to read only mode and then read `root_node_id`. 303 304 These optimizations also trade off performance for memory use. Use `cache_size` to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use `cache_vacate` if you are over your memory limits. 305 306 #### IPFS HAMT Sample Code 307 ```python 308 kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints 309 hamt = await HAMT.build(cas=kubo_cas) 310 await hamt.set("foo", "bar") 311 assert (await hamt.get("foo")) == "bar" 312 await hamt.make_read_only() 313 cid = hamt.root_node_id # our root node CID 314 print(cid) 315 ``` 316 """ 317 318 def __init__( 319 self, 320 cas: ContentAddressedStore, 321 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 322 root_node_id: IPLDKind | None = None, 323 read_only: bool = False, 324 max_bucket_size: int = 4, 325 values_are_bytes: bool = False, 326 ): 327 """ 328 Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below. 329 """ 330 331 self.cas: ContentAddressedStore = cas 332 """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS.""" 333 334 self.hash_fn: Callable[[bytes], bytes] = hash_fn 335 """ 336 This is the hash function used to place a key-value within the HAMT. 337 338 To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes. 339 340 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 341 342 Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors. 343 """ 344 345 self.lock: asyncio.Lock = asyncio.Lock() 346 """@private""" 347 348 self.values_are_bytes: bool = values_are_bytes 349 """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind. 350 351 This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk. 352 """ 353 354 if max_bucket_size < 1: 355 raise ValueError("Bucket size maximum must be a positive integer") 356 self.max_bucket_size: int = max_bucket_size 357 """ 358 This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided. 359 360 This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS. 361 362 This must be a positive integer with a minimum of 1. 363 """ 364 365 self.root_node_id: IPLDKind = root_node_id 366 """ 367 This is type IPLDKind but the documentation generator pdoc mangles it a bit. 368 369 Read from this only when in read mode to get something valid! 370 """ 371 372 self.read_only: bool = read_only 373 """Clients should NOT modify this. 374 375 This is here for checking whether the HAMT is in read only or read/write mode. 376 377 The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS. 378 """ 379 self.node_store: NodeStore 380 """@private""" 381 if read_only: 382 self.node_store = ReadCacheStore(self) 383 else: 384 self.node_store = InMemoryTreeStore(self) 385 386 @classmethod 387 async def build(cls, *args: Any, **kwargs: Any) -> "HAMT": 388 """ 389 Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__. 390 391 This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided. 392 """ 393 hamt = cls(*args, **kwargs) 394 if hamt.root_node_id is None: 395 hamt.root_node_id = await hamt.node_store.save(None, Node()) 396 return hamt 397 398 # This is typically a massive blocking operation, you dont want to be running this concurrently with a bunch of other operations, so it's ok to have it not be async 399 async def make_read_only(self) -> None: 400 """ 401 Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID. 402 403 In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between. 404 """ 405 async with self.lock: 406 inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store) 407 await inmemory_tree.vacate() 408 409 self.read_only = True 410 self.node_store = ReadCacheStore(self) 411 412 async def enable_write(self) -> None: 413 """ 414 Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`. 415 """ 416 async with self.lock: 417 # The read cache has no writes that need to be sent upstream so we can remove it without vacating 418 self.read_only = False 419 self.node_store = InMemoryTreeStore(self) 420 421 async def cache_size(self) -> int: 422 """ 423 Returns the memory used by some internal performance optimization tools in bytes. 424 425 This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however. 426 427 Be warned that this may take a while to run for large HAMTs. 428 429 For more on memory management, see the `HAMT` class documentation. 430 """ 431 if self.read_only: 432 return self.node_store.size() 433 async with self.lock: 434 return self.node_store.size() 435 436 async def cache_vacate(self) -> None: 437 """ 438 Vacate and completely empty out the internal read/write cache. 439 440 Be warned that this may take a while if there have been a lot of write operations. 441 442 For more on memory management, see the `HAMT` class documentation. 443 """ 444 if self.read_only: 445 await self.node_store.vacate() 446 else: 447 async with self.lock: 448 await self.node_store.vacate() 449 450 async def _reserialize_and_link( 451 self, node_stack: list[tuple[IPLDKind, Node]] 452 ) -> None: 453 """ 454 This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store 455 Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack 456 Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python 457 If a node ends up being empty, then it is deleted entirely, unless it is the root node 458 Modifies in place 459 """ 460 # iterate in the reverse direction, this range goes from n-1 to 0, from the bottommost tree node to the root 461 for stack_index in range(len(node_stack) - 1, -1, -1): 462 old_id, node = node_stack[stack_index] 463 464 # If this node is empty, and it's not the root node, then we can delete it entirely from the list 465 is_root: bool = stack_index == 0 466 if node.is_empty() and not is_root: 467 # Unlink from the rest of the tree 468 _, prev_node = node_stack[stack_index - 1] 469 # When removing links, don't worry about two nodes having the same link since all nodes are guaranteed to be different by the removal of empty nodes after every single operation 470 for link_index in prev_node.iter_link_indices(): 471 link = prev_node.get_link(link_index) 472 if link == old_id: 473 # Delete the link by making it an empty bucket 474 prev_node.data[link_index] = {} 475 break 476 477 # Remove from our stack, continue reserializing up the tree 478 node_stack.pop(stack_index) 479 continue 480 481 # If not an empty node, just reserialize like normal and replace this one 482 new_store_id: IPLDKind = await self.node_store.save(old_id, node) 483 node_stack[stack_index] = (new_store_id, node) 484 485 # If this is not the last i.e. root node, we need to change the linking of the node prior in the list since we just reserialized 486 if not is_root: 487 _, prev_node = node_stack[stack_index - 1] 488 prev_node.replace_link(old_id, new_store_id) 489 490 # automatically skip encoding if the value provided is of the bytes variety 491 async def set(self, key: str, val: IPLDKind) -> None: 492 """Write a key-value mapping.""" 493 if self.read_only: 494 raise Exception("Cannot call set on a read only HAMT") 495 496 data: bytes 497 if self.values_are_bytes: 498 data = cast( 499 bytes, val 500 ) # let users get an exception if they pass in a non bytes when they want to skip encoding 501 else: 502 data = dag_cbor.encode(val) 503 504 pointer: IPLDKind = await self.cas.save(data, codec="raw") 505 await self._set_pointer(key, pointer) 506 507 async def _set_pointer(self, key: str, val_ptr: IPLDKind) -> None: 508 async with self.lock: 509 node_stack: list[tuple[IPLDKind, Node]] = [] 510 root_node: Node = await self.node_store.load(self.root_node_id) 511 node_stack.append((self.root_node_id, root_node)) 512 513 # FIFO queue to keep track of all the KVs we need to insert 514 # This is needed if any buckets overflow and so we need to reinsert all those KVs 515 kvs_queue: list[tuple[str, IPLDKind]] = [] 516 kvs_queue.append((key, val_ptr)) 517 518 while len(kvs_queue) > 0: 519 _, top_node = node_stack[-1] 520 curr_key, curr_val_ptr = kvs_queue[0] 521 522 raw_hash: bytes = self.hash_fn(curr_key.encode()) 523 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 524 525 item = top_node.data[map_key] 526 if isinstance(item, list): 527 next_node_id: IPLDKind = item[0] 528 next_node: Node = await self.node_store.load(next_node_id) 529 node_stack.append((next_node_id, next_node)) 530 elif isinstance(item, dict): 531 bucket: dict[str, IPLDKind] = item 532 533 # If this bucket already has this same key, or has space, just rewrite the value and then go work on the others in the queue 534 if curr_key in bucket or len(bucket) < self.max_bucket_size: 535 bucket[curr_key] = curr_val_ptr 536 kvs_queue.pop(0) 537 continue 538 539 # The current key is not in the bucket and the bucket is too full, so empty KVs from the bucket and restart insertion 540 for k in bucket: 541 v_ptr = bucket[k] 542 kvs_queue.append((k, v_ptr)) 543 544 # Create a new link to a new node so that we can reflow these KVs into a new subtree 545 new_node = Node() 546 new_node_id: IPLDKind = await self.node_store.save(None, new_node) 547 link: list[IPLDKind] = [new_node_id] 548 top_node.data[map_key] = link 549 550 # Finally, reserialize and fix all links, deleting empty nodes as needed 551 await self._reserialize_and_link(node_stack) 552 self.root_node_id = node_stack[0][0] 553 554 async def delete(self, key: str) -> None: 555 """Delete a key-value mapping.""" 556 557 # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo 558 if self.read_only: 559 raise Exception("Cannot call delete on a read only HAMT") 560 561 async with self.lock: 562 raw_hash: bytes = self.hash_fn(key.encode()) 563 564 node_stack: list[tuple[IPLDKind, Node]] = [] 565 root_node: Node = await self.node_store.load(self.root_node_id) 566 node_stack.append((self.root_node_id, root_node)) 567 568 created_change: bool = False 569 while True: 570 _, top_node = node_stack[-1] 571 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 572 573 item = top_node.data[map_key] 574 if isinstance(item, dict): 575 bucket = item 576 if key in bucket: 577 del bucket[key] 578 created_change = True 579 # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError 580 break 581 elif isinstance(item, list): 582 link: IPLDKind = item[0] 583 next_node: Node = await self.node_store.load(link) 584 node_stack.append((link, next_node)) 585 586 # Finally, reserialize and fix all links, deleting empty nodes as needed 587 if created_change: 588 await self._reserialize_and_link(node_stack) 589 self.root_node_id = node_stack[0][0] 590 else: 591 # If we didn't make a change, then this key must not exist within the HAMT 592 raise KeyError 593 594 async def get( 595 self, 596 key: str, 597 offset: Optional[int] = None, 598 length: Optional[int] = None, 599 suffix: Optional[int] = None, 600 ) -> IPLDKind: 601 """Get a value.""" 602 pointer: IPLDKind = await self.get_pointer(key) 603 data: bytes = await self.cas.load( 604 pointer, offset=offset, length=length, suffix=suffix 605 ) 606 if self.values_are_bytes: 607 return data 608 else: 609 return dag_cbor.decode(data) 610 611 async def get_pointer(self, key: str) -> IPLDKind: 612 """ 613 Get a store ID that points to the value for this key. 614 615 This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example. 616 """ 617 # If read only, no need to acquire a lock 618 pointer: IPLDKind 619 if self.read_only: 620 pointer = await self._get_pointer(key) 621 else: 622 async with self.lock: 623 pointer = await self._get_pointer(key) 624 625 return pointer 626 627 # Callers MUST handle acquiring a lock 628 async def _get_pointer(self, key: str) -> IPLDKind: 629 raw_hash: bytes = self.hash_fn(key.encode()) 630 631 current_id: IPLDKind = self.root_node_id 632 current_depth: int = 0 633 634 # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value of IPLDKind 635 result_ptr: IPLDKind = None 636 found_a_result: bool = False 637 while True: 638 top_id: IPLDKind = current_id 639 top_node: Node = await self.node_store.load(top_id) 640 map_key: int = extract_bits(raw_hash, current_depth, 8) 641 642 # Check if this key is in one of the buckets 643 item = top_node.data[map_key] 644 if isinstance(item, dict): 645 bucket = item 646 if key in bucket: 647 result_ptr = bucket[key] 648 found_a_result = True 649 break 650 651 if isinstance(item, list): 652 link: IPLDKind = item[0] 653 current_id = link 654 current_depth += 1 655 continue 656 657 # Nowhere left to go, stop walking down the tree 658 break 659 660 if not found_a_result: 661 raise KeyError 662 663 return result_ptr 664 665 # Callers MUST handle locking or not on their own 666 async def _iter_nodes(self) -> AsyncIterator[tuple[IPLDKind, Node]]: 667 node_id_stack: list[IPLDKind] = [self.root_node_id] 668 while len(node_id_stack) > 0: 669 top_id: IPLDKind = node_id_stack.pop() 670 node: Node = await self.node_store.load(top_id) 671 yield (top_id, node) 672 node_id_stack.extend(list(node.iter_links())) 673 674 async def keys(self) -> AsyncIterator[str]: 675 """ 676 AsyncIterator returning all keys in the HAMT. 677 678 If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed. 679 680 When the HAMT is in read only mode however, this can be run concurrently with get operations. 681 """ 682 if self.read_only: 683 async for k in self._keys_no_locking(): 684 yield k 685 else: 686 async with self.lock: 687 async for k in self._keys_no_locking(): 688 yield k 689 690 async def _keys_no_locking(self) -> AsyncIterator[str]: 691 async for _, node in self._iter_nodes(): 692 for bucket in node.iter_buckets(): 693 for key in bucket: 694 yield key 695 696 async def len(self) -> int: 697 """ 698 Return the number of key value mappings in this HAMT. 699 700 When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations. 701 """ 702 count: int = 0 703 async for _ in self.keys(): 704 count += 1 705 706 return count
An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.
Use this to store arbitrarily large key-value mappings in your CAS of choice.
For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.
When in read-only mode, the HAMT is both async and thread safe.
A note about memory management, read+write and read-only modes
The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.
Note that in read+write, the real root node id IS NOT VALID. You should call make_read_only() to convert to read only mode and then read root_node_id.
These optimizations also trade off performance for memory use. Use cache_size to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use cache_vacate if you are over your memory limits.
IPFS HAMT Sample Code
kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
hamt = await HAMT.build(cas=kubo_cas)
await hamt.set("foo", "bar")
assert (await hamt.get("foo")) == "bar"
await hamt.make_read_only()
cid = hamt.root_node_id # our root node CID
print(cid)
318 def __init__( 319 self, 320 cas: ContentAddressedStore, 321 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 322 root_node_id: IPLDKind | None = None, 323 read_only: bool = False, 324 max_bucket_size: int = 4, 325 values_are_bytes: bool = False, 326 ): 327 """ 328 Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below. 329 """ 330 331 self.cas: ContentAddressedStore = cas 332 """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS.""" 333 334 self.hash_fn: Callable[[bytes], bytes] = hash_fn 335 """ 336 This is the hash function used to place a key-value within the HAMT. 337 338 To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes. 339 340 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 341 342 Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors. 343 """ 344 345 self.lock: asyncio.Lock = asyncio.Lock() 346 """@private""" 347 348 self.values_are_bytes: bool = values_are_bytes 349 """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind. 350 351 This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk. 352 """ 353 354 if max_bucket_size < 1: 355 raise ValueError("Bucket size maximum must be a positive integer") 356 self.max_bucket_size: int = max_bucket_size 357 """ 358 This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided. 359 360 This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS. 361 362 This must be a positive integer with a minimum of 1. 363 """ 364 365 self.root_node_id: IPLDKind = root_node_id 366 """ 367 This is type IPLDKind but the documentation generator pdoc mangles it a bit. 368 369 Read from this only when in read mode to get something valid! 370 """ 371 372 self.read_only: bool = read_only 373 """Clients should NOT modify this. 374 375 This is here for checking whether the HAMT is in read only or read/write mode. 376 377 The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS. 378 """ 379 self.node_store: NodeStore 380 """@private""" 381 if read_only: 382 self.node_store = ReadCacheStore(self) 383 else: 384 self.node_store = InMemoryTreeStore(self)
Use build if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
The backing storage system. py-hamt provides an implementation KuboCAS for IPFS.
This is the hash function used to place a key-value within the HAMT.
To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
This must be a positive integer with a minimum of 1.
This is type IPLDKind but the documentation generator pdoc mangles it a bit.
Read from this only when in read mode to get something valid!
Clients should NOT modify this.
This is here for checking whether the HAMT is in read only or read/write mode.
The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
386 @classmethod 387 async def build(cls, *args: Any, **kwargs: Any) -> "HAMT": 388 """ 389 Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__. 390 391 This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided. 392 """ 393 hamt = cls(*args, **kwargs) 394 if hamt.root_node_id is None: 395 hamt.root_node_id = await hamt.node_store.save(None, Node()) 396 return hamt
Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as __init__. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
399 async def make_read_only(self) -> None: 400 """ 401 Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID. 402 403 In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between. 404 """ 405 async with self.lock: 406 inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store) 407 await inmemory_tree.vacate() 408 409 self.read_only = True 410 self.node_store = ReadCacheStore(self)
Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
412 async def enable_write(self) -> None: 413 """ 414 Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`. 415 """ 416 async with self.lock: 417 # The read cache has no writes that need to be sent upstream so we can remove it without vacating 418 self.read_only = False 419 self.node_store = InMemoryTreeStore(self)
Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use make_read_only.
421 async def cache_size(self) -> int: 422 """ 423 Returns the memory used by some internal performance optimization tools in bytes. 424 425 This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however. 426 427 Be warned that this may take a while to run for large HAMTs. 428 429 For more on memory management, see the `HAMT` class documentation. 430 """ 431 if self.read_only: 432 return self.node_store.size() 433 async with self.lock: 434 return self.node_store.size()
Returns the memory used by some internal performance optimization tools in bytes.
This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
Be warned that this may take a while to run for large HAMTs.
For more on memory management, see the HAMT class documentation.
436 async def cache_vacate(self) -> None: 437 """ 438 Vacate and completely empty out the internal read/write cache. 439 440 Be warned that this may take a while if there have been a lot of write operations. 441 442 For more on memory management, see the `HAMT` class documentation. 443 """ 444 if self.read_only: 445 await self.node_store.vacate() 446 else: 447 async with self.lock: 448 await self.node_store.vacate()
Vacate and completely empty out the internal read/write cache.
Be warned that this may take a while if there have been a lot of write operations.
For more on memory management, see the HAMT class documentation.
491 async def set(self, key: str, val: IPLDKind) -> None: 492 """Write a key-value mapping.""" 493 if self.read_only: 494 raise Exception("Cannot call set on a read only HAMT") 495 496 data: bytes 497 if self.values_are_bytes: 498 data = cast( 499 bytes, val 500 ) # let users get an exception if they pass in a non bytes when they want to skip encoding 501 else: 502 data = dag_cbor.encode(val) 503 504 pointer: IPLDKind = await self.cas.save(data, codec="raw") 505 await self._set_pointer(key, pointer)
Write a key-value mapping.
554 async def delete(self, key: str) -> None: 555 """Delete a key-value mapping.""" 556 557 # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo 558 if self.read_only: 559 raise Exception("Cannot call delete on a read only HAMT") 560 561 async with self.lock: 562 raw_hash: bytes = self.hash_fn(key.encode()) 563 564 node_stack: list[tuple[IPLDKind, Node]] = [] 565 root_node: Node = await self.node_store.load(self.root_node_id) 566 node_stack.append((self.root_node_id, root_node)) 567 568 created_change: bool = False 569 while True: 570 _, top_node = node_stack[-1] 571 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 572 573 item = top_node.data[map_key] 574 if isinstance(item, dict): 575 bucket = item 576 if key in bucket: 577 del bucket[key] 578 created_change = True 579 # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError 580 break 581 elif isinstance(item, list): 582 link: IPLDKind = item[0] 583 next_node: Node = await self.node_store.load(link) 584 node_stack.append((link, next_node)) 585 586 # Finally, reserialize and fix all links, deleting empty nodes as needed 587 if created_change: 588 await self._reserialize_and_link(node_stack) 589 self.root_node_id = node_stack[0][0] 590 else: 591 # If we didn't make a change, then this key must not exist within the HAMT 592 raise KeyError
Delete a key-value mapping.
594 async def get( 595 self, 596 key: str, 597 offset: Optional[int] = None, 598 length: Optional[int] = None, 599 suffix: Optional[int] = None, 600 ) -> IPLDKind: 601 """Get a value.""" 602 pointer: IPLDKind = await self.get_pointer(key) 603 data: bytes = await self.cas.load( 604 pointer, offset=offset, length=length, suffix=suffix 605 ) 606 if self.values_are_bytes: 607 return data 608 else: 609 return dag_cbor.decode(data)
Get a value.
611 async def get_pointer(self, key: str) -> IPLDKind: 612 """ 613 Get a store ID that points to the value for this key. 614 615 This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example. 616 """ 617 # If read only, no need to acquire a lock 618 pointer: IPLDKind 619 if self.read_only: 620 pointer = await self._get_pointer(key) 621 else: 622 async with self.lock: 623 pointer = await self._get_pointer(key) 624 625 return pointer
Get a store ID that points to the value for this key.
This is useful for some applications that want to implement a read cache. Due to the restrictions of ContentAddressedStore on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in ZarrHAMTStore for example.
674 async def keys(self) -> AsyncIterator[str]: 675 """ 676 AsyncIterator returning all keys in the HAMT. 677 678 If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed. 679 680 When the HAMT is in read only mode however, this can be run concurrently with get operations. 681 """ 682 if self.read_only: 683 async for k in self._keys_no_locking(): 684 yield k 685 else: 686 async with self.lock: 687 async for k in self._keys_no_locking(): 688 yield k
AsyncIterator returning all keys in the HAMT.
If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
When the HAMT is in read only mode however, this can be run concurrently with get operations.
696 async def len(self) -> int: 697 """ 698 Return the number of key value mappings in this HAMT. 699 700 When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations. 701 """ 702 count: int = 0 703 async for _ in self.keys(): 704 count += 1 705 706 return count
Return the number of key value mappings in this HAMT.
When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
14class ContentAddressedStore(ABC): 15 """ 16 Abstract class that represents a content addressed storage that the `HAMT` can use for keeping data. 17 18 Note that the return type of save and input to load is really type `IPLDKind`, but the documentation generator pdoc mangles it unfortunately. 19 20 #### A note on the IPLDKind return types 21 Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions: 22 1. No lists or dicts, since python does not classify these as immutable. 23 2. No `None` values since this is used in HAMT's `__init__` to indicate that an empty HAMT needs to be initialized. 24 """ 25 26 CodecInput = Literal["raw", "dag-cbor"] 27 28 @abstractmethod 29 async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: 30 """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type. 31 32 `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model. 33 """ 34 35 @abstractmethod 36 async def load( 37 self, 38 id: IPLDKind, 39 offset: Optional[int] = None, 40 length: Optional[int] = None, 41 suffix: Optional[int] = None, 42 ) -> bytes: 43 """Retrieve data.""" 44 45 async def pin_cid(self, id: IPLDKind, target_rpc: str) -> None: 46 """Pin a CID in the storage.""" 47 pass # pragma: no cover 48 49 async def unpin_cid(self, id: IPLDKind, target_rpc: str) -> None: 50 """Unpin a CID in the storage.""" 51 pass # pragma: no cover 52 53 async def pin_update( 54 self, old_id: IPLDKind, new_id: IPLDKind, target_rpc: str 55 ) -> None: 56 """Update the pinned CID in the storage.""" 57 pass # pragma: no cover 58 59 async def pin_ls(self, target_rpc: str) -> list[Dict[str, Any]]: 60 """List all pinned CIDs in the storage.""" 61 return [] # pragma: no cover
Abstract class that represents a content addressed storage that the HAMT can use for keeping data.
Note that the return type of save and input to load is really type IPLDKind, but the documentation generator pdoc mangles it unfortunately.
A note on the IPLDKind return types
Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:
- No lists or dicts, since python does not classify these as immutable.
- No
Nonevalues since this is used in HAMT's__init__to indicate that an empty HAMT needs to be initialized.
28 @abstractmethod 29 async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: 30 """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type. 31 32 `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model. 33 """
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
35 @abstractmethod 36 async def load( 37 self, 38 id: IPLDKind, 39 offset: Optional[int] = None, 40 length: Optional[int] = None, 41 suffix: Optional[int] = None, 42 ) -> bytes: 43 """Retrieve data."""
Retrieve data.
45 async def pin_cid(self, id: IPLDKind, target_rpc: str) -> None: 46 """Pin a CID in the storage.""" 47 pass # pragma: no cover
Pin a CID in the storage.
49 async def unpin_cid(self, id: IPLDKind, target_rpc: str) -> None: 50 """Unpin a CID in the storage.""" 51 pass # pragma: no cover
Unpin a CID in the storage.
53 async def pin_update( 54 self, old_id: IPLDKind, new_id: IPLDKind, target_rpc: str 55 ) -> None: 56 """Update the pinned CID in the storage.""" 57 pass # pragma: no cover
Update the pinned CID in the storage.
64class InMemoryCAS(ContentAddressedStore): 65 """Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that `save` returns and `load` takes in.""" 66 67 store: dict[bytes, bytes] 68 hash_alg: Multihash 69 70 def __init__(self): 71 self.store = dict() 72 self.hash_alg = multihash.get("blake3") 73 74 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes: 75 hash: bytes = self.hash_alg.digest(data, size=32) 76 self.store[hash] = data 77 return hash 78 79 async def load( 80 self, 81 id: IPLDKind, 82 offset: Optional[int] = None, 83 length: Optional[int] = None, 84 suffix: Optional[int] = None, 85 ) -> bytes: 86 """ 87 `ContentAddressedStore` allows any IPLD scalar key. For the in-memory 88 backend we *require* a `bytes` hash; anything else is rejected at run 89 time. In OO type-checking, a subclass may widen (make more general) argument types, 90 but it must never narrow them; otherwise callers that expect the base-class contract can break. 91 Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. 92 This is why we use `cast` here, to tell mypy that we know what we are doing. 93 h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch 94 """ 95 key = cast(bytes, id) 96 if not isinstance(key, (bytes, bytearray)): # defensive guard 97 raise TypeError( 98 f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" 99 ) 100 data: bytes 101 try: 102 data = self.store[key] 103 except KeyError as exc: 104 raise KeyError("Object not found in in-memory store") from exc 105 106 if offset is not None: 107 start = offset 108 if length is not None: 109 end = start + length 110 return data[start:end] 111 else: 112 return data[start:] 113 elif suffix is not None: # If only length is given, assume start from 0 114 return data[-suffix:] 115 else: # Full load 116 return data
Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that save returns and load takes in.
74 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes: 75 hash: bytes = self.hash_alg.digest(data, size=32) 76 self.store[hash] = data 77 return hash
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
79 async def load( 80 self, 81 id: IPLDKind, 82 offset: Optional[int] = None, 83 length: Optional[int] = None, 84 suffix: Optional[int] = None, 85 ) -> bytes: 86 """ 87 `ContentAddressedStore` allows any IPLD scalar key. For the in-memory 88 backend we *require* a `bytes` hash; anything else is rejected at run 89 time. In OO type-checking, a subclass may widen (make more general) argument types, 90 but it must never narrow them; otherwise callers that expect the base-class contract can break. 91 Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. 92 This is why we use `cast` here, to tell mypy that we know what we are doing. 93 h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch 94 """ 95 key = cast(bytes, id) 96 if not isinstance(key, (bytes, bytearray)): # defensive guard 97 raise TypeError( 98 f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" 99 ) 100 data: bytes 101 try: 102 data = self.store[key] 103 except KeyError as exc: 104 raise KeyError("Object not found in in-memory store") from exc 105 106 if offset is not None: 107 start = offset 108 if length is not None: 109 end = start + length 110 return data[start:end] 111 else: 112 return data[start:] 113 elif suffix is not None: # If only length is given, assume start from 0 114 return data[-suffix:] 115 else: # Full load 116 return data
ContentAddressedStore allows any IPLD scalar key. For the in-memory
backend we require a bytes hash; anything else is rejected at run
time. In OO type-checking, a subclass may widen (make more general) argument types,
but it must never narrow them; otherwise callers that expect the base-class contract can break.
Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
This is why we use cast here, to tell mypy that we know what we are doing.
h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
119class KuboCAS(ContentAddressedStore): 120 """ 121 Connects to an **IPFS Kubo** daemon. 122 123 The IDs in save and load are IPLD CIDs. 124 125 * **save()** → RPC (`/api/v0/add`) 126 * **load()** → HTTP gateway (`/ipfs/{cid}`) 127 128 `save` uses the RPC API and `load` uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use. 129 130 ### Authentication / custom headers 131 You have two options: 132 133 1. **Bring your own `httpx.AsyncClient`** 134 Pass it via `client=...` — any default headers or auth 135 configured on that client are reused for **every** request. 136 2. **Let `KuboCAS` build the client** but pass 137 `headers=` *and*/or `auth=` kwargs; they are forwarded to the 138 internally–created `AsyncClient`. 139 140 ```python 141 import httpx 142 from py_hamt import KuboCAS 143 144 # Option 1: user-supplied client 145 client = httpx.AsyncClient( 146 headers={"Authorization": "Bearer <token>"}, 147 auth=("user", "pass"), 148 ) 149 cas = KuboCAS(client=client) 150 151 # Option 2: let KuboCAS create the client 152 cas = KuboCAS( 153 headers={"X-My-Header": "yes"}, 154 auth=("user", "pass"), 155 ) 156 ``` 157 158 ### Parameters 159 - **hasher** (str): multihash name (defaults to *blake3*). 160 - **client** (`httpx.AsyncClient | None`): reuse an existing 161 client; if *None* KuboCAS will create one lazily. 162 - **headers** (dict[str, str] | None): default headers for the 163 internally-created client. 164 - **auth** (`tuple[str, str] | None`): authentication tuple (username, password) 165 for the internally-created client. 166 - **rpc_base_url / gateway_base_url** (str | None): override daemon 167 endpoints (defaults match the local daemon ports). 168 - **chunker** (str): chunking algorithm specification for Kubo's `add` 169 RPC. Accepted formats are `"size-<positive int>"`, `"rabin"`, or 170 `"rabin-<min>-<avg>-<max>"`. 171 172 ... 173 """ 174 175 KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = "http://127.0.0.1:8080" 176 KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = "http://127.0.0.1:5001" 177 178 DAG_PB_MARKER: int = 0x70 179 """@private""" 180 181 # Take in a httpx client that can be reused across POSTs and GETs to a specific IPFS daemon 182 def __init__( 183 self, 184 hasher: str = "blake3", 185 client: httpx.AsyncClient | None = None, 186 rpc_base_url: str | None = None, 187 gateway_base_url: str | None = None, 188 concurrency: int = 32, 189 *, 190 headers: dict[str, str] | None = None, 191 auth: Tuple[str, str] | None = None, 192 pin_on_add: bool = False, 193 chunker: str = "size-1048576", 194 max_retries: int = 3, 195 initial_delay: float = 1.0, 196 backoff_factor: float = 2.0, 197 ): 198 """ 199 If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all. 200 201 ### `httpx.AsyncClient` Management 202 If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()` 203 as a class instance cannot know when it will no longer be in use, unless explicitly told to do so. 204 205 If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below: 206 ```python 207 async with httpx.AsyncClient() as client, KuboCAS( 208 rpc_base_url=rpc_base_url, 209 gateway_base_url=gateway_base_url, 210 client=client, 211 ) as kubo_cas: 212 hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) 213 zhs = ZarrHAMTStore(hamt) 214 # Use the KuboCAS instance as needed 215 # ... 216 ``` 217 As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up. 218 ``` python 219 cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url) 220 # Use the KuboCAS instance as needed 221 # ... 222 await cas.aclose() # Ensure resources are cleaned up 223 ``` 224 225 ### Authenticated RPC/Gateway Access 226 Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in. 227 Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided. 228 If you do not need authentication, you can leave these parameters as `None`. 229 230 ### RPC and HTTP Gateway Base URLs 231 These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided. 232 """ 233 234 self._owns_client: bool = False 235 self._closed: bool = True 236 self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {} 237 self._default_headers = headers 238 self._default_auth = auth 239 240 # Now, perform validation that might raise an exception 241 chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)" 242 if re.fullmatch(chunker_pattern, chunker) is None: 243 raise ValueError("Invalid chunker specification") 244 self.chunker: str = chunker 245 246 self.hasher: str = hasher 247 """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.""" 248 249 if rpc_base_url is None: 250 rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma 251 if gateway_base_url is None: 252 gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL 253 254 if "/ipfs/" in gateway_base_url: 255 gateway_base_url = gateway_base_url.split("/ipfs/")[0] 256 257 # Standard gateway URL construction with proper path handling 258 if gateway_base_url.endswith("/"): 259 gateway_base_url = f"{gateway_base_url}ipfs/" 260 else: 261 gateway_base_url = f"{gateway_base_url}/ipfs/" 262 263 pin_string: str = "true" if pin_on_add else "false" 264 self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin={pin_string}" 265 """@private""" 266 self.gateway_base_url: str = gateway_base_url 267 """@private""" 268 269 if client is not None: 270 # A client was supplied by the user. We don't own it. 271 self._owns_client = False 272 self._client_per_loop = {asyncio.get_running_loop(): client} 273 else: 274 # No client supplied. We will own any clients we create. 275 self._owns_client = True 276 self._client_per_loop = {} 277 278 # store for later use by _loop_client() 279 self._default_headers = headers 280 self._default_auth = auth 281 282 self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency) 283 self._closed = False 284 285 # Validate retry parameters 286 if max_retries < 0: 287 raise ValueError("max_retries must be non-negative") 288 if initial_delay <= 0: 289 raise ValueError("initial_delay must be positive") 290 if backoff_factor < 1.0: 291 raise ValueError("backoff_factor must be >= 1.0 for exponential backoff") 292 293 self.max_retries = max_retries 294 self.initial_delay = initial_delay 295 self.backoff_factor = backoff_factor 296 297 # --------------------------------------------------------------------- # 298 # helper: get or create the client bound to the current running loop # 299 # --------------------------------------------------------------------- # 300 def _loop_client(self) -> httpx.AsyncClient: 301 """Get or create a client for the current event loop. 302 303 If the instance was previously closed but owns its clients, a fresh 304 client mapping is lazily created on demand. Users that supplied their 305 own ``httpx.AsyncClient`` still receive an error when the instance has 306 been closed, as we cannot safely recreate their client. 307 """ 308 if self._closed: 309 if not self._owns_client: 310 raise RuntimeError("KuboCAS is closed; create a new instance") 311 # We previously closed all internally-owned clients. Reset the 312 # state so that new clients can be created lazily. 313 self._closed = False 314 self._client_per_loop = {} 315 316 loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() 317 try: 318 return self._client_per_loop[loop] 319 except KeyError: 320 # Create a new client 321 client = httpx.AsyncClient( 322 timeout=60.0, 323 headers=self._default_headers, 324 auth=self._default_auth, 325 limits=httpx.Limits(max_connections=64, max_keepalive_connections=32), 326 # Uncomment when they finally support Robust HTTP/2 GOAWAY responses 327 # http2=True, 328 ) 329 self._client_per_loop[loop] = client 330 return client 331 332 # --------------------------------------------------------------------- # 333 # graceful shutdown: close **all** clients we own # 334 # --------------------------------------------------------------------- # 335 async def aclose(self) -> None: 336 """ 337 Closes all internally-created clients. Must be called from an async context. 338 """ 339 if self._owns_client is False: # external client → caller closes 340 return 341 342 # This method is async, so we can reliably await the async close method. 343 # The complex sync/async logic is handled by __del__. 344 for client in list(self._client_per_loop.values()): 345 if not client.is_closed: 346 try: 347 await client.aclose() 348 except Exception: 349 pass # best-effort cleanup 350 351 self._client_per_loop.clear() 352 self._closed = True 353 354 # At this point, _client_per_loop should be empty or only contain 355 # clients from loops we haven't seen (which shouldn't happen in practice) 356 async def __aenter__(self) -> "KuboCAS": 357 return self 358 359 async def __aexit__(self, *exc: Any) -> None: 360 await self.aclose() 361 362 def __del__(self) -> None: 363 """Best-effort close for internally-created clients.""" 364 if not hasattr(self, "_owns_client") or not hasattr(self, "_closed"): 365 return 366 367 if not self._owns_client or self._closed: 368 return 369 370 # Attempt proper cleanup if possible 371 try: 372 loop = asyncio.get_running_loop() 373 except RuntimeError: 374 # No running loop - can't do async cleanup 375 # Just clear the client references synchronously 376 if hasattr(self, "_client_per_loop"): 377 # We can't await client.aclose() without a loop, 378 # so just clear the references 379 self._client_per_loop.clear() 380 self._closed = True 381 return 382 383 # If we get here, we have a running loop 384 try: 385 if loop.is_running(): 386 # Schedule cleanup in the existing loop 387 loop.create_task(self.aclose()) 388 else: 389 # Loop exists but not running - try asyncio.run 390 coro = self.aclose() # Create the coroutine 391 try: 392 asyncio.run(coro) 393 except Exception: 394 # If asyncio.run fails, we need to close the coroutine properly 395 coro.close() # This prevents the RuntimeWarning 396 raise # Re-raise to hit the outer except block 397 except Exception: 398 # If all else fails, just clear references 399 if hasattr(self, "_client_per_loop"): 400 self._client_per_loop.clear() 401 self._closed = True 402 403 # --------------------------------------------------------------------- # 404 # save() – now uses the per-loop client # 405 # --------------------------------------------------------------------- # 406 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID: 407 async with self._sem: 408 files = {"file": data} 409 client = self._loop_client() 410 retry_count = 0 411 412 while retry_count <= self.max_retries: 413 try: 414 response = await client.post( 415 self.rpc_url, files=files, timeout=60.0 416 ) 417 response.raise_for_status() 418 cid_str: str = response.json()["Hash"] 419 cid: CID = CID.decode(cid_str) 420 if cid.codec.code != self.DAG_PB_MARKER: 421 cid = cid.set(codec=codec) 422 return cid 423 424 except (httpx.TimeoutException, httpx.RequestError) as e: 425 retry_count += 1 426 if retry_count > self.max_retries: 427 raise httpx.TimeoutException( 428 f"Failed to save data after {self.max_retries} retries: {str(e)}", 429 request=e.request 430 if isinstance(e, httpx.RequestError) 431 else None, 432 ) 433 434 # Calculate backoff delay 435 delay = self.initial_delay * ( 436 self.backoff_factor ** (retry_count - 1) 437 ) 438 # Add some jitter to prevent thundering herd 439 jitter = delay * 0.1 * (random.random() - 0.5) 440 await asyncio.sleep(delay + jitter) 441 442 except httpx.HTTPStatusError: 443 # Re-raise non-timeout HTTP errors immediately 444 raise 445 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover 446 447 async def load( 448 self, 449 id: IPLDKind, 450 offset: Optional[int] = None, 451 length: Optional[int] = None, 452 suffix: Optional[int] = None, 453 ) -> bytes: 454 """Load data from a CID using the IPFS gateway with optional Range requests.""" 455 cid = cast(CID, id) 456 url: str = f"{self.gateway_base_url + str(cid)}" 457 headers: Dict[str, str] = {} 458 459 # Construct the Range header if required 460 if offset is not None: 461 start = offset 462 if length is not None: 463 # Standard HTTP Range: bytes=start-end (inclusive) 464 end = start + length - 1 465 headers["Range"] = f"bytes={start}-{end}" 466 else: 467 # Standard HTTP Range: bytes=start- (from start to end) 468 headers["Range"] = f"bytes={start}-" 469 elif suffix is not None: 470 # Standard HTTP Range: bytes=-N (last N bytes) 471 headers["Range"] = f"bytes=-{suffix}" 472 473 async with self._sem: # Throttle gateway 474 client = self._loop_client() 475 retry_count = 0 476 477 while retry_count <= self.max_retries: 478 try: 479 response = await client.get( 480 url, headers=headers or None, timeout=60.0 481 ) 482 response.raise_for_status() 483 return response.content 484 485 except (httpx.TimeoutException, httpx.RequestError) as e: 486 retry_count += 1 487 if retry_count > self.max_retries: 488 raise httpx.TimeoutException( 489 f"Failed to load data after {self.max_retries} retries: {str(e)}", 490 request=e.request 491 if isinstance(e, httpx.RequestError) 492 else None, 493 ) 494 495 # Calculate backoff delay with jitter 496 delay = self.initial_delay * ( 497 self.backoff_factor ** (retry_count - 1) 498 ) 499 jitter = delay * 0.1 * (random.random() - 0.5) 500 await asyncio.sleep(delay + jitter) 501 502 except httpx.HTTPStatusError: 503 # Re-raise non-timeout HTTP errors immediately 504 raise 505 506 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover 507 508 # --------------------------------------------------------------------- # 509 # pin_cid() – method to pin a CID # 510 # --------------------------------------------------------------------- # 511 async def pin_cid( 512 self, 513 cid: CID, 514 target_rpc: str = "http://127.0.0.1:5001", 515 ) -> None: 516 """ 517 Pins a CID to the local Kubo node via the RPC API. 518 519 This call is recursive by default, pinning all linked objects. 520 521 Args: 522 cid (CID): The Content ID to pin. 523 target_rpc (str): The RPC URL of the Kubo node. 524 """ 525 params = {"arg": str(cid), "recursive": "true"} 526 pin_add_url_base: str = f"{target_rpc}/api/v0/pin/add" 527 528 async with self._sem: # throttle RPC 529 client = self._loop_client() 530 response = await client.post(pin_add_url_base, params=params) 531 response.raise_for_status() 532 533 async def unpin_cid( 534 self, cid: CID, target_rpc: str = "http://127.0.0.1:5001" 535 ) -> None: 536 """ 537 Unpins a CID from the local Kubo node via the RPC API. 538 539 Args: 540 cid (CID): The Content ID to unpin. 541 """ 542 params = {"arg": str(cid), "recursive": "true"} 543 unpin_url_base: str = f"{target_rpc}/api/v0/pin/rm" 544 async with self._sem: # throttle RPC 545 client = self._loop_client() 546 response = await client.post(unpin_url_base, params=params) 547 response.raise_for_status() 548 549 async def pin_update( 550 self, 551 old_id: IPLDKind, 552 new_id: IPLDKind, 553 target_rpc: str = "http://127.0.0.1:5001", 554 ) -> None: 555 """ 556 Updates the pinned CID in the storage. 557 558 Args: 559 old_id (IPLDKind): The old Content ID to replace. 560 new_id (IPLDKind): The new Content ID to pin. 561 """ 562 params = {"arg": [str(old_id), str(new_id)]} 563 pin_update_url_base: str = f"{target_rpc}/api/v0/pin/update" 564 async with self._sem: # throttle RPC 565 client = self._loop_client() 566 response = await client.post(pin_update_url_base, params=params) 567 response.raise_for_status() 568 569 async def pin_ls( 570 self, target_rpc: str = "http://127.0.0.1:5001" 571 ) -> list[Dict[str, Any]]: 572 """ 573 Lists all pinned CIDs on the local Kubo node via the RPC API. 574 575 Args: 576 target_rpc (str): The RPC URL of the Kubo node. 577 578 Returns: 579 List[CID]: A list of pinned CIDs. 580 """ 581 pin_ls_url_base: str = f"{target_rpc}/api/v0/pin/ls" 582 async with self._sem: # throttle RPC 583 client = self._loop_client() 584 response = await client.post(pin_ls_url_base) 585 response.raise_for_status() 586 pins = response.json().get("Keys", []) 587 return pins
Connects to an IPFS Kubo daemon.
The IDs in save and load are IPLD CIDs.
- save() → RPC (
/api/v0/add) - load() → HTTP gateway (
/ipfs/{cid})
save uses the RPC API and load uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.
Authentication / custom headers
You have two options:
- Bring your own
httpx.AsyncClientPass it viaclient=...— any default headers or auth configured on that client are reused for every request. - Let
KuboCASbuild the client but passheaders=and/orauth=kwargs; they are forwarded to the internally–createdAsyncClient.
import httpx
from py_hamt import KuboCAS
# Option 1: user-supplied client
client = httpx.AsyncClient(
headers={"Authorization": "Bearer <token>"},
auth=("user", "pass"),
)
cas = KuboCAS(client=client)
# Option 2: let KuboCAS create the client
cas = KuboCAS(
headers={"X-My-Header": "yes"},
auth=("user", "pass"),
)
Parameters
- hasher (str): multihash name (defaults to blake3).
- client (
httpx.AsyncClient | None): reuse an existing client; if None KuboCAS will create one lazily. - headers (dict[str, str] | None): default headers for the internally-created client.
- auth (
tuple[str, str] | None): authentication tuple (username, password) for the internally-created client. - rpc_base_url / gateway_base_url (str | None): override daemon endpoints (defaults match the local daemon ports).
- chunker (str): chunking algorithm specification for Kubo's
addRPC. Accepted formats are"size-<positive int>","rabin", or"rabin-<min>-<avg>-<max>".
...
182 def __init__( 183 self, 184 hasher: str = "blake3", 185 client: httpx.AsyncClient | None = None, 186 rpc_base_url: str | None = None, 187 gateway_base_url: str | None = None, 188 concurrency: int = 32, 189 *, 190 headers: dict[str, str] | None = None, 191 auth: Tuple[str, str] | None = None, 192 pin_on_add: bool = False, 193 chunker: str = "size-1048576", 194 max_retries: int = 3, 195 initial_delay: float = 1.0, 196 backoff_factor: float = 2.0, 197 ): 198 """ 199 If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all. 200 201 ### `httpx.AsyncClient` Management 202 If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()` 203 as a class instance cannot know when it will no longer be in use, unless explicitly told to do so. 204 205 If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below: 206 ```python 207 async with httpx.AsyncClient() as client, KuboCAS( 208 rpc_base_url=rpc_base_url, 209 gateway_base_url=gateway_base_url, 210 client=client, 211 ) as kubo_cas: 212 hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) 213 zhs = ZarrHAMTStore(hamt) 214 # Use the KuboCAS instance as needed 215 # ... 216 ``` 217 As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up. 218 ``` python 219 cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url) 220 # Use the KuboCAS instance as needed 221 # ... 222 await cas.aclose() # Ensure resources are cleaned up 223 ``` 224 225 ### Authenticated RPC/Gateway Access 226 Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in. 227 Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided. 228 If you do not need authentication, you can leave these parameters as `None`. 229 230 ### RPC and HTTP Gateway Base URLs 231 These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided. 232 """ 233 234 self._owns_client: bool = False 235 self._closed: bool = True 236 self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {} 237 self._default_headers = headers 238 self._default_auth = auth 239 240 # Now, perform validation that might raise an exception 241 chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)" 242 if re.fullmatch(chunker_pattern, chunker) is None: 243 raise ValueError("Invalid chunker specification") 244 self.chunker: str = chunker 245 246 self.hasher: str = hasher 247 """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.""" 248 249 if rpc_base_url is None: 250 rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma 251 if gateway_base_url is None: 252 gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL 253 254 if "/ipfs/" in gateway_base_url: 255 gateway_base_url = gateway_base_url.split("/ipfs/")[0] 256 257 # Standard gateway URL construction with proper path handling 258 if gateway_base_url.endswith("/"): 259 gateway_base_url = f"{gateway_base_url}ipfs/" 260 else: 261 gateway_base_url = f"{gateway_base_url}/ipfs/" 262 263 pin_string: str = "true" if pin_on_add else "false" 264 self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin={pin_string}" 265 """@private""" 266 self.gateway_base_url: str = gateway_base_url 267 """@private""" 268 269 if client is not None: 270 # A client was supplied by the user. We don't own it. 271 self._owns_client = False 272 self._client_per_loop = {asyncio.get_running_loop(): client} 273 else: 274 # No client supplied. We will own any clients we create. 275 self._owns_client = True 276 self._client_per_loop = {} 277 278 # store for later use by _loop_client() 279 self._default_headers = headers 280 self._default_auth = auth 281 282 self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency) 283 self._closed = False 284 285 # Validate retry parameters 286 if max_retries < 0: 287 raise ValueError("max_retries must be non-negative") 288 if initial_delay <= 0: 289 raise ValueError("initial_delay must be positive") 290 if backoff_factor < 1.0: 291 raise ValueError("backoff_factor must be >= 1.0 for exponential backoff") 292 293 self.max_retries = max_retries 294 self.initial_delay = initial_delay 295 self.backoff_factor = backoff_factor
If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
httpx.AsyncClient Management
If client is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using await cas.aclose()
as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
If you are using the KuboCAS instance in an async with block, it will automatically close the client when the block is exited which is what we suggest below:
async with httpx.AsyncClient() as client, KuboCAS(
rpc_base_url=rpc_base_url,
gateway_base_url=gateway_base_url,
client=client,
) as kubo_cas:
hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
zhs = ZarrHAMTStore(hamt)
# Use the KuboCAS instance as needed
# ...
As mentioned, if you do not use the async with syntax, you should call await cas.aclose() when you are done using the instance to ensure that all resources are cleaned up.
cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
# Use the KuboCAS instance as needed
# ...
await cas.aclose() # Ensure resources are cleaned up
Authenticated RPC/Gateway Access
Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own httpx.AsyncClient and then passing that in.
Alternatively, they can pass in headers and auth parameters to the constructor, which will be used to create a new httpx.AsyncClient if one is not provided.
If you do not need authentication, you can leave these parameters as None.
RPC and HTTP Gateway Base URLs
These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.
335 async def aclose(self) -> None: 336 """ 337 Closes all internally-created clients. Must be called from an async context. 338 """ 339 if self._owns_client is False: # external client → caller closes 340 return 341 342 # This method is async, so we can reliably await the async close method. 343 # The complex sync/async logic is handled by __del__. 344 for client in list(self._client_per_loop.values()): 345 if not client.is_closed: 346 try: 347 await client.aclose() 348 except Exception: 349 pass # best-effort cleanup 350 351 self._client_per_loop.clear() 352 self._closed = True
Closes all internally-created clients. Must be called from an async context.
406 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID: 407 async with self._sem: 408 files = {"file": data} 409 client = self._loop_client() 410 retry_count = 0 411 412 while retry_count <= self.max_retries: 413 try: 414 response = await client.post( 415 self.rpc_url, files=files, timeout=60.0 416 ) 417 response.raise_for_status() 418 cid_str: str = response.json()["Hash"] 419 cid: CID = CID.decode(cid_str) 420 if cid.codec.code != self.DAG_PB_MARKER: 421 cid = cid.set(codec=codec) 422 return cid 423 424 except (httpx.TimeoutException, httpx.RequestError) as e: 425 retry_count += 1 426 if retry_count > self.max_retries: 427 raise httpx.TimeoutException( 428 f"Failed to save data after {self.max_retries} retries: {str(e)}", 429 request=e.request 430 if isinstance(e, httpx.RequestError) 431 else None, 432 ) 433 434 # Calculate backoff delay 435 delay = self.initial_delay * ( 436 self.backoff_factor ** (retry_count - 1) 437 ) 438 # Add some jitter to prevent thundering herd 439 jitter = delay * 0.1 * (random.random() - 0.5) 440 await asyncio.sleep(delay + jitter) 441 442 except httpx.HTTPStatusError: 443 # Re-raise non-timeout HTTP errors immediately 444 raise 445 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
447 async def load( 448 self, 449 id: IPLDKind, 450 offset: Optional[int] = None, 451 length: Optional[int] = None, 452 suffix: Optional[int] = None, 453 ) -> bytes: 454 """Load data from a CID using the IPFS gateway with optional Range requests.""" 455 cid = cast(CID, id) 456 url: str = f"{self.gateway_base_url + str(cid)}" 457 headers: Dict[str, str] = {} 458 459 # Construct the Range header if required 460 if offset is not None: 461 start = offset 462 if length is not None: 463 # Standard HTTP Range: bytes=start-end (inclusive) 464 end = start + length - 1 465 headers["Range"] = f"bytes={start}-{end}" 466 else: 467 # Standard HTTP Range: bytes=start- (from start to end) 468 headers["Range"] = f"bytes={start}-" 469 elif suffix is not None: 470 # Standard HTTP Range: bytes=-N (last N bytes) 471 headers["Range"] = f"bytes=-{suffix}" 472 473 async with self._sem: # Throttle gateway 474 client = self._loop_client() 475 retry_count = 0 476 477 while retry_count <= self.max_retries: 478 try: 479 response = await client.get( 480 url, headers=headers or None, timeout=60.0 481 ) 482 response.raise_for_status() 483 return response.content 484 485 except (httpx.TimeoutException, httpx.RequestError) as e: 486 retry_count += 1 487 if retry_count > self.max_retries: 488 raise httpx.TimeoutException( 489 f"Failed to load data after {self.max_retries} retries: {str(e)}", 490 request=e.request 491 if isinstance(e, httpx.RequestError) 492 else None, 493 ) 494 495 # Calculate backoff delay with jitter 496 delay = self.initial_delay * ( 497 self.backoff_factor ** (retry_count - 1) 498 ) 499 jitter = delay * 0.1 * (random.random() - 0.5) 500 await asyncio.sleep(delay + jitter) 501 502 except httpx.HTTPStatusError: 503 # Re-raise non-timeout HTTP errors immediately 504 raise 505 506 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover
Load data from a CID using the IPFS gateway with optional Range requests.
511 async def pin_cid( 512 self, 513 cid: CID, 514 target_rpc: str = "http://127.0.0.1:5001", 515 ) -> None: 516 """ 517 Pins a CID to the local Kubo node via the RPC API. 518 519 This call is recursive by default, pinning all linked objects. 520 521 Args: 522 cid (CID): The Content ID to pin. 523 target_rpc (str): The RPC URL of the Kubo node. 524 """ 525 params = {"arg": str(cid), "recursive": "true"} 526 pin_add_url_base: str = f"{target_rpc}/api/v0/pin/add" 527 528 async with self._sem: # throttle RPC 529 client = self._loop_client() 530 response = await client.post(pin_add_url_base, params=params) 531 response.raise_for_status()
Pins a CID to the local Kubo node via the RPC API.
This call is recursive by default, pinning all linked objects.
Args: cid (CID): The Content ID to pin. target_rpc (str): The RPC URL of the Kubo node.
533 async def unpin_cid( 534 self, cid: CID, target_rpc: str = "http://127.0.0.1:5001" 535 ) -> None: 536 """ 537 Unpins a CID from the local Kubo node via the RPC API. 538 539 Args: 540 cid (CID): The Content ID to unpin. 541 """ 542 params = {"arg": str(cid), "recursive": "true"} 543 unpin_url_base: str = f"{target_rpc}/api/v0/pin/rm" 544 async with self._sem: # throttle RPC 545 client = self._loop_client() 546 response = await client.post(unpin_url_base, params=params) 547 response.raise_for_status()
Unpins a CID from the local Kubo node via the RPC API.
Args: cid (CID): The Content ID to unpin.
549 async def pin_update( 550 self, 551 old_id: IPLDKind, 552 new_id: IPLDKind, 553 target_rpc: str = "http://127.0.0.1:5001", 554 ) -> None: 555 """ 556 Updates the pinned CID in the storage. 557 558 Args: 559 old_id (IPLDKind): The old Content ID to replace. 560 new_id (IPLDKind): The new Content ID to pin. 561 """ 562 params = {"arg": [str(old_id), str(new_id)]} 563 pin_update_url_base: str = f"{target_rpc}/api/v0/pin/update" 564 async with self._sem: # throttle RPC 565 client = self._loop_client() 566 response = await client.post(pin_update_url_base, params=params) 567 response.raise_for_status()
Updates the pinned CID in the storage.
Args: old_id (IPLDKind): The old Content ID to replace. new_id (IPLDKind): The new Content ID to pin.
569 async def pin_ls( 570 self, target_rpc: str = "http://127.0.0.1:5001" 571 ) -> list[Dict[str, Any]]: 572 """ 573 Lists all pinned CIDs on the local Kubo node via the RPC API. 574 575 Args: 576 target_rpc (str): The RPC URL of the Kubo node. 577 578 Returns: 579 List[CID]: A list of pinned CIDs. 580 """ 581 pin_ls_url_base: str = f"{target_rpc}/api/v0/pin/ls" 582 async with self._sem: # throttle RPC 583 client = self._loop_client() 584 response = await client.post(pin_ls_url_base) 585 response.raise_for_status() 586 pins = response.json().get("Keys", []) 587 return pins
Lists all pinned CIDs on the local Kubo node via the RPC API.
Args: target_rpc (str): The RPC URL of the Kubo node.
Returns: List[CID]: A list of pinned CIDs.
13class ZarrHAMTStore(zarr.abc.store.Store): 14 """ 15 Write and read Zarr v3s with a HAMT. 16 17 Read **or** write a Zarr-v3 store whose key/value pairs live inside a 18 py-hamt mapping. 19 20 Keys are stored verbatim (``"temp/c/0/0/0"`` → same string in HAMT) and 21 the value is the raw byte payload produced by Zarr. No additional 22 framing, compression, or encryption is applied by this class. For a zarr encryption example 23 see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb 24 For a fully encrypted zarr store, where metadata is not available, please see 25 :class:`SimpleEncryptedZarrHAMTStore` but we do not recommend using it. 26 27 #### A note about using the same `ZarrHAMTStore` for writing and then reading again 28 If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings. 29 30 #### Sample Code 31 ```python 32 # --- Write --- 33 ds: xarray.Dataset = # ... 34 cas: ContentAddressedStore = # ... 35 hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes 36 hamt = await HAMT.build(cas, values_are_bytes=True) # write-enabled 37 zhs = ZarrHAMTStore(hamt, read_only=False) 38 ds.to_zarr(store=zhs, mode="w", zarr_format=3) 39 await hamt.make_read_only() # flush + freeze 40 root_node_id = hamt.root_node_id 41 print(root_node_id) 42 43 # --- read --- 44 hamt_ro = await HAMT.build( 45 cas, root_node_id=root_cid, read_only=True, values_are_bytes=True 46 ) 47 zhs_ro = ZarrHAMTStore(hamt_ro, read_only=True) 48 ds_ro = xarray.open_zarr(store=zhs_ro) 49 50 51 print(ds_ro) 52 xarray.testing.assert_identical(ds, ds_ro) 53 ``` 54 """ 55 56 _forced_read_only: bool | None = None # sentinel for wrapper clones 57 58 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 59 """ 60 ### `hamt` and `read_only` 61 You need to make sure the following two things are true: 62 63 1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async. 64 2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations. 65 66 ##### A note about the zarr chunk separator, "/" vs "." 67 While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well. 68 69 #### Metadata Read Cache 70 `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data. 71 """ 72 super().__init__(read_only=read_only) 73 74 assert hamt.read_only == read_only 75 assert hamt.values_are_bytes 76 self.hamt: HAMT = hamt 77 """ 78 The internal HAMT. 79 Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID. 80 """ 81 82 self.metadata_read_cache: dict[str, bytes] = {} 83 """@private""" 84 85 def _map_byte_request( 86 self, byte_range: Optional[zarr.abc.store.ByteRequest] 87 ) -> tuple[Optional[int], Optional[int], Optional[int]]: 88 """Helper to map Zarr ByteRequest to offset, length, suffix.""" 89 offset: Optional[int] = None 90 length: Optional[int] = None 91 suffix: Optional[int] = None 92 93 if byte_range: 94 if isinstance(byte_range, zarr.abc.store.RangeByteRequest): 95 offset = byte_range.start 96 length = byte_range.end - byte_range.start 97 if length is not None and length < 0: 98 raise ValueError("End must be >= start for RangeByteRequest") 99 elif isinstance(byte_range, zarr.abc.store.OffsetByteRequest): 100 offset = byte_range.offset 101 elif isinstance(byte_range, zarr.abc.store.SuffixByteRequest): 102 suffix = byte_range.suffix 103 else: 104 raise TypeError(f"Unsupported ByteRequest type: {type(byte_range)}") 105 106 return offset, length, suffix 107 108 @property 109 def read_only(self) -> bool: # type: ignore[override] 110 if self._forced_read_only is not None: # instance attr overrides 111 return self._forced_read_only 112 return self.hamt.read_only 113 114 def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore": 115 """ 116 Return this store (if the flag already matches) or a *shallow* 117 clone that presents the requested read‑only status. 118 119 The clone **shares** the same :class:`~py_hamt.hamt.HAMT` 120 instance; no flushing, network traffic or async work is done. 121 """ 122 # Fast path 123 if read_only == self.read_only: 124 return self # Same mode, return same instance 125 126 # Create new instance with different read_only flag 127 # Creates a *bare* instance without running its __init__ 128 clone = type(self).__new__(type(self)) 129 130 # Copy attributes that matter 131 clone.hamt = self.hamt # Share the HAMT 132 clone._forced_read_only = read_only 133 clone.metadata_read_cache = self.metadata_read_cache.copy() 134 135 # Re‑initialise the zarr base class so that Zarr sees the flag 136 zarr.abc.store.Store.__init__(clone, read_only=read_only) 137 return clone 138 139 def __eq__(self, other: object) -> bool: 140 """@private""" 141 if not isinstance(other, ZarrHAMTStore): 142 return False 143 return self.hamt.root_node_id == other.hamt.root_node_id 144 145 async def get( 146 self, 147 key: str, 148 prototype: zarr.core.buffer.BufferPrototype, 149 byte_range: zarr.abc.store.ByteRequest | None = None, 150 ) -> zarr.core.buffer.Buffer | None: 151 """@private""" 152 try: 153 val: bytes 154 # do len check to avoid indexing into overly short strings, 3.12 does not throw errors but we dont know if other versions will 155 is_metadata: bool = ( 156 len(key) >= 9 and key[-9:] == "zarr.json" 157 ) # if path ends with zarr.json 158 159 if is_metadata and byte_range is None and key in self.metadata_read_cache: 160 val = self.metadata_read_cache[key] 161 else: 162 offset, length, suffix = self._map_byte_request(byte_range) 163 val = cast( 164 bytes, 165 await self.hamt.get( 166 key, offset=offset, length=length, suffix=suffix 167 ), 168 ) # We know values received will always be bytes since we only store bytes in the HAMT 169 if is_metadata and byte_range is None: 170 self.metadata_read_cache[key] = val 171 172 return prototype.buffer.from_bytes(val) 173 except KeyError: 174 # Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases 175 return None 176 except Exception as e: 177 print(f"Error getting key '{key}' with range {byte_range}: {e}") 178 raise 179 180 async def get_partial_values( 181 self, 182 prototype: zarr.core.buffer.BufferPrototype, 183 key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], 184 ) -> list[zarr.core.buffer.Buffer | None]: 185 """ 186 Retrieves multiple keys or byte ranges concurrently using asyncio.gather. 187 """ 188 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 189 results = await asyncio.gather( 190 *tasks, return_exceptions=False 191 ) # Set return_exceptions=True for debugging 192 return results 193 194 async def exists(self, key: str) -> bool: 195 """@private""" 196 try: 197 await self.hamt.get(key) 198 return True 199 except KeyError: 200 return False 201 202 @property 203 def supports_writes(self) -> bool: 204 """@private""" 205 return not self.hamt.read_only 206 207 @property 208 def supports_partial_writes(self) -> bool: 209 """@private""" 210 return False 211 212 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 213 """@private""" 214 if self.read_only: 215 raise Exception("Cannot write to a read-only store.") 216 217 if key in self.metadata_read_cache: 218 self.metadata_read_cache[key] = value.to_bytes() 219 await self.hamt.set(key, value.to_bytes()) 220 221 async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None: 222 """@private""" 223 if not (await self.exists(key)): 224 await self.set(key, value) 225 226 async def set_partial_values( 227 self, key_start_values: Iterable[tuple[str, int, BytesLike]] 228 ) -> None: 229 """@private""" 230 raise NotImplementedError 231 232 @property 233 def supports_deletes(self) -> bool: 234 """@private""" 235 return not self.hamt.read_only 236 237 async def delete(self, key: str) -> None: 238 """@private""" 239 if self.read_only: 240 raise Exception("Cannot write to a read-only store.") 241 try: 242 await self.hamt.delete(key) 243 # In practice these lines never seem to be needed, creating and appending data are the only operations most zarrs actually undergo 244 # if key in self.metadata_read_cache: 245 # del self.metadata_read_cache[key] 246 # It's fine if the key was not in the HAMT 247 # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason, probably concurrency issues 248 except KeyError: 249 return 250 251 @property 252 def supports_listing(self) -> bool: 253 """@private""" 254 return True 255 256 async def list(self) -> AsyncIterator[str]: 257 """@private""" 258 async for key in self.hamt.keys(): 259 yield key 260 261 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 262 """@private""" 263 async for key in self.hamt.keys(): 264 if key.startswith(prefix): 265 yield key 266 267 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 268 """ 269 @private 270 List *immediate* children that live directly under **prefix**. 271 272 This is similar to :py:meth:`list_prefix` but collapses everything 273 below the first ``"/"`` after *prefix*. Each child name is yielded 274 **exactly once** in the order of first appearance while scanning the 275 HAMT keys. 276 277 Parameters 278 ---------- 279 prefix : str 280 Logical directory path. *Must* end with ``"/"`` for the result to 281 make sense (e.g. ``"a/b/"``). 282 283 Yields 284 ------ 285 str 286 The name of each direct child (file or sub-directory) of *prefix*. 287 288 Examples 289 -------- 290 With keys :: 291 292 a/b/c/d 293 a/b/c/e 294 a/b/f 295 a/b/g/h/i 296 297 ``await list_dir("a/b/")`` produces :: 298 299 c 300 f 301 g 302 303 Notes 304 ----- 305 • Internally uses a :class:`set` to deduplicate names; memory grows 306 with the number of *unique* children, not the total number of keys. 307 • Order is **not** sorted; it reflects the first encounter while 308 iterating over :py:meth:`HAMT.keys`. 309 """ 310 seen_names: set[str] = set() 311 async for key in self.hamt.keys(): 312 if key.startswith(prefix): 313 suffix: str = key[len(prefix) :] 314 first_slash: int = suffix.find("/") 315 if first_slash == -1: 316 if suffix not in seen_names: 317 seen_names.add(suffix) 318 yield suffix 319 else: 320 name: str = suffix[0:first_slash] 321 if name not in seen_names: 322 seen_names.add(name) 323 yield name
Write and read Zarr v3s with a HAMT.
Read or write a Zarr-v3 store whose key/value pairs live inside a py-hamt mapping.
Keys are stored verbatim ("temp/c/0/0/0" → same string in HAMT) and
the value is the raw byte payload produced by Zarr. No additional
framing, compression, or encryption is applied by this class. For a zarr encryption example
see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
For a fully encrypted zarr store, where metadata is not available, please see
SimpleEncryptedZarrHAMTStore but we do not recommend using it.
A note about using the same ZarrHAMTStore for writing and then reading again
If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.
Sample Code
# --- Write ---
ds: xarray.Dataset = # ...
cas: ContentAddressedStore = # ...
hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
hamt = await HAMT.build(cas, values_are_bytes=True) # write-enabled
zhs = ZarrHAMTStore(hamt, read_only=False)
ds.to_zarr(store=zhs, mode="w", zarr_format=3)
await hamt.make_read_only() # flush + freeze
root_node_id = hamt.root_node_id
print(root_node_id)
# --- read ---
hamt_ro = await HAMT.build(
cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
)
zhs_ro = ZarrHAMTStore(hamt_ro, read_only=True)
ds_ro = xarray.open_zarr(store=zhs_ro)
print(ds_ro)
xarray.testing.assert_identical(ds, ds_ro)
58 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 59 """ 60 ### `hamt` and `read_only` 61 You need to make sure the following two things are true: 62 63 1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async. 64 2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations. 65 66 ##### A note about the zarr chunk separator, "/" vs "." 67 While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well. 68 69 #### Metadata Read Cache 70 `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data. 71 """ 72 super().__init__(read_only=read_only) 73 74 assert hamt.read_only == read_only 75 assert hamt.values_are_bytes 76 self.hamt: HAMT = hamt 77 """ 78 The internal HAMT. 79 Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID. 80 """ 81 82 self.metadata_read_cache: dict[str, bytes] = {} 83 """@private"""
hamt and read_only
You need to make sure the following two things are true:
- The HAMT is in the same read only mode that you are passing into the Zarr store. This means that
hamt.read_only == read_only. This is because making a HAMT read only automatically requires async operations, but__init__cannot be async. - The HAMT has
hamt.values_are_bytes == True. This improves efficiency with Zarr v3 operations.
A note about the zarr chunk separator, "/" vs "."
While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
Metadata Read Cache
ZarrHAMTStore has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
The internal HAMT. Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
108 @property 109 def read_only(self) -> bool: # type: ignore[override] 110 if self._forced_read_only is not None: # instance attr overrides 111 return self._forced_read_only 112 return self.hamt.read_only
Is the store read-only?
114 def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore": 115 """ 116 Return this store (if the flag already matches) or a *shallow* 117 clone that presents the requested read‑only status. 118 119 The clone **shares** the same :class:`~py_hamt.hamt.HAMT` 120 instance; no flushing, network traffic or async work is done. 121 """ 122 # Fast path 123 if read_only == self.read_only: 124 return self # Same mode, return same instance 125 126 # Create new instance with different read_only flag 127 # Creates a *bare* instance without running its __init__ 128 clone = type(self).__new__(type(self)) 129 130 # Copy attributes that matter 131 clone.hamt = self.hamt # Share the HAMT 132 clone._forced_read_only = read_only 133 clone.metadata_read_cache = self.metadata_read_cache.copy() 134 135 # Re‑initialise the zarr base class so that Zarr sees the flag 136 zarr.abc.store.Store.__init__(clone, read_only=read_only) 137 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same ~py_hamt.hamt.HAMT
instance; no flushing, network traffic or async work is done.
180 async def get_partial_values( 181 self, 182 prototype: zarr.core.buffer.BufferPrototype, 183 key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], 184 ) -> list[zarr.core.buffer.Buffer | None]: 185 """ 186 Retrieves multiple keys or byte ranges concurrently using asyncio.gather. 187 """ 188 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 189 results = await asyncio.gather( 190 *tasks, return_exceptions=False 191 ) # Set return_exceptions=True for debugging 192 return results
Retrieves multiple keys or byte ranges concurrently using asyncio.gather.
13class SimpleEncryptedZarrHAMTStore(ZarrHAMTStore): 14 """ 15 Write and read Zarr v3s with a HAMT, encrypting *everything* for maximum privacy. 16 17 This store uses ChaCha20-Poly1305 to encrypt every single key-value pair 18 stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.) 19 and data chunks. This provides strong privacy but means the Zarr store is 20 completely opaque and unusable without the correct encryption key and header. 21 22 Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb 23 24 #### Encryption Details 25 - Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce). 26 - Requires a 32-byte encryption key and a header. 27 - Each encrypted value includes a 24-byte nonce and a 16-byte tag. 28 29 #### Important Considerations 30 - Since metadata is encrypted, standard Zarr tools cannot inspect the 31 dataset without prior decryption using this store class. 32 - There is no support for partial encryption or excluding variables. 33 - There is no metadata caching. 34 35 #### Sample Code 36 ```python 37 import xarray 38 from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar 39 from Crypto.Random import get_random_bytes 40 import numpy as np 41 42 # Setup 43 ds = xarray.Dataset( 44 {"data": (("y", "x"), np.arange(12).reshape(3, 4))}, 45 coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]} 46 ) 47 cas = KuboCAS() # Example ContentAddressedStore 48 encryption_key = get_random_bytes(32) 49 header = b"fully-encrypted-zarr" 50 51 # --- Write --- 52 hamt_write = await HAMT.build(cas=cas, values_are_bytes=True) 53 ezhs_write = SimpleEncryptedZarrHAMTStore( 54 hamt_write, False, encryption_key, header 55 ) 56 print("Writing fully encrypted Zarr...") 57 ds.to_zarr(store=ezhs_write, mode="w") 58 await hamt_write.make_read_only() 59 root_node_id = hamt_write.root_node_id 60 print(f"Wrote Zarr with root: {root_node_id}") 61 62 # --- Read --- 63 hamt_read = await HAMT.build( 64 cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True 65 ) 66 ezhs_read = SimpleEncryptedZarrHAMTStore( 67 hamt_read, True, encryption_key, header 68 ) 69 print("\nReading fully encrypted Zarr...") 70 ds_read = xarray.open_zarr(store=ezhs_read) 71 print("Read back dataset:") 72 print(ds_read) 73 xarray.testing.assert_identical(ds, ds_read) 74 print("Read successful and data verified.") 75 76 # --- Read with wrong key (demonstrates failure) --- 77 wrong_key = get_random_bytes(32) 78 hamt_bad = await HAMT.build( 79 cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True 80 ) 81 ezhs_bad = SimpleEncryptedZarrHAMTStore( 82 hamt_bad, True, wrong_key, header 83 ) 84 print("\nAttempting to read with wrong key...") 85 try: 86 ds_bad = xarray.open_zarr(store=ezhs_bad) 87 print(ds_bad) 88 except Exception as e: 89 print(f"Failed to read as expected: {type(e).__name__} - {e}") 90 ``` 91 """ 92 93 def __init__( 94 self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes 95 ) -> None: 96 """ 97 Initializes the SimpleEncryptedZarrHAMTStore. 98 99 Args: 100 hamt: The HAMT instance for storage. Must have `values_are_bytes=True`. 101 Its `read_only` status must match the `read_only` argument. 102 read_only: If True, the store is in read-only mode. 103 encryption_key: A 32-byte key for ChaCha20-Poly1305. 104 header: A header (bytes) used as associated data in encryption. 105 """ 106 super().__init__(hamt, read_only=read_only) 107 108 if len(encryption_key) != 32: 109 raise ValueError("Encryption key must be exactly 32 bytes long.") 110 self.encryption_key = encryption_key 111 self.header = header 112 self.metadata_read_cache: dict[str, bytes] = {} 113 114 def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore": 115 if read_only == self.read_only: 116 return self 117 118 clone = type(self).__new__(type(self)) 119 clone.hamt = self.hamt 120 clone.encryption_key = self.encryption_key 121 clone.header = self.header 122 clone.metadata_read_cache = self.metadata_read_cache 123 clone._forced_read_only = read_only # safe; attribute is declared 124 zarr.abc.store.Store.__init__(clone, read_only=read_only) 125 return clone 126 127 def _encrypt(self, val: bytes) -> bytes: 128 """Encrypts data using ChaCha20-Poly1305.""" 129 nonce = get_random_bytes(24) # XChaCha20 uses a 24-byte nonce 130 cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce) 131 cipher.update(self.header) 132 ciphertext, tag = cipher.encrypt_and_digest(val) 133 return nonce + tag + ciphertext 134 135 def _decrypt(self, val: bytes) -> bytes: 136 """Decrypts data using ChaCha20-Poly1305.""" 137 try: 138 # Extract nonce (24), tag (16), and ciphertext 139 nonce, tag, ciphertext = val[:24], val[24:40], val[40:] 140 cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce) 141 cipher.update(self.header) 142 plaintext = cipher.decrypt_and_verify(ciphertext, tag) 143 return plaintext 144 except Exception as e: 145 # Catching a broad exception as various issues (key, tag, length) can occur. 146 raise ValueError( 147 "Decryption failed. Check key, header, or data integrity." 148 ) from e 149 150 def __eq__(self, other: object) -> bool: 151 """@private""" 152 if not isinstance(other, SimpleEncryptedZarrHAMTStore): 153 return False 154 return ( 155 self.hamt.root_node_id == other.hamt.root_node_id 156 and self.encryption_key == other.encryption_key 157 and self.header == other.header 158 ) 159 160 async def get( 161 self, 162 key: str, 163 prototype: zarr.core.buffer.BufferPrototype, 164 byte_range: zarr.abc.store.ByteRequest | None = None, 165 ) -> zarr.core.buffer.Buffer | None: 166 """@private""" 167 try: 168 decrypted_val: bytes 169 is_metadata: bool = ( 170 len(key) >= 9 and key[-9:] == "zarr.json" 171 ) # if path ends with zarr.json 172 173 if is_metadata and key in self.metadata_read_cache: 174 decrypted_val = self.metadata_read_cache[key] 175 else: 176 raw_val = cast( 177 bytes, await self.hamt.get(key) 178 ) # We know values received will always be bytes since we only store bytes in the HAMT 179 decrypted_val = self._decrypt(raw_val) 180 if is_metadata: 181 self.metadata_read_cache[key] = decrypted_val 182 return prototype.buffer.from_bytes(decrypted_val) 183 except KeyError: 184 return None 185 186 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 187 """@private""" 188 if self.read_only: 189 raise Exception("Cannot write to a read-only store.") 190 191 raw_bytes = value.to_bytes() 192 if key in self.metadata_read_cache: 193 self.metadata_read_cache[key] = raw_bytes 194 # Encrypt it 195 encrypted_bytes = self._encrypt(raw_bytes) 196 await self.hamt.set(key, encrypted_bytes)
Write and read Zarr v3s with a HAMT, encrypting everything for maximum privacy.
This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
and data chunks. This provides strong privacy but means the Zarr store is
completely opaque and unusable without the correct encryption key and header.
Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
#### Encryption Details
- Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
- Requires a 32-byte encryption key and a header.
- Each encrypted value includes a 24-byte nonce and a 16-byte tag.
#### Important Considerations
- Since metadata is encrypted, standard Zarr tools cannot inspect the
dataset without prior decryption using this store class.
- There is no support for partial encryption or excluding variables.
- There is no metadata caching.
#### Sample Code
import xarray
from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
from Crypto.Random import get_random_bytes
import numpy as np
# Setup
ds = xarray.Dataset(
{"data": (("y", "x"), np.arange(12).reshape(3, 4))},
coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
)
cas = KuboCAS() # Example ContentAddressedStore
encryption_key = get_random_bytes(32)
header = b"fully-encrypted-zarr"
# --- Write ---
hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
ezhs_write = SimpleEncryptedZarrHAMTStore(
hamt_write, False, encryption_key, header
)
print("Writing fully encrypted Zarr...")
ds.to_zarr(store=ezhs_write, mode="w")
await hamt_write.make_read_only()
root_node_id = hamt_write.root_node_id
print(f"Wrote Zarr with root: {root_node_id}")
# --- Read ---
hamt_read = await HAMT.build(
cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
)
ezhs_read = SimpleEncryptedZarrHAMTStore(
hamt_read, True, encryption_key, header
)
print("
Reading fully encrypted Zarr...")
ds_read = xarray.open_zarr(store=ezhs_read)
print("Read back dataset:")
print(ds_read)
xarray.testing.assert_identical(ds, ds_read)
print("Read successful and data verified.")
# --- Read with wrong key (demonstrates failure) ---
wrong_key = get_random_bytes(32)
hamt_bad = await HAMT.build(
cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
)
ezhs_bad = SimpleEncryptedZarrHAMTStore(
hamt_bad, True, wrong_key, header
)
print("
Attempting to read with wrong key...")
try:
ds_bad = xarray.open_zarr(store=ezhs_bad)
print(ds_bad)
except Exception as e:
print(f"Failed to read as expected: {type(e).__name__} - {e}")
93 def __init__( 94 self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes 95 ) -> None: 96 """ 97 Initializes the SimpleEncryptedZarrHAMTStore. 98 99 Args: 100 hamt: The HAMT instance for storage. Must have `values_are_bytes=True`. 101 Its `read_only` status must match the `read_only` argument. 102 read_only: If True, the store is in read-only mode. 103 encryption_key: A 32-byte key for ChaCha20-Poly1305. 104 header: A header (bytes) used as associated data in encryption. 105 """ 106 super().__init__(hamt, read_only=read_only) 107 108 if len(encryption_key) != 32: 109 raise ValueError("Encryption key must be exactly 32 bytes long.") 110 self.encryption_key = encryption_key 111 self.header = header 112 self.metadata_read_cache: dict[str, bytes] = {}
Initializes the SimpleEncryptedZarrHAMTStore.
Args:
hamt: The HAMT instance for storage. Must have values_are_bytes=True.
Its read_only status must match the read_only argument.
read_only: If True, the store is in read-only mode.
encryption_key: A 32-byte key for ChaCha20-Poly1305.
header: A header (bytes) used as associated data in encryption.
114 def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore": 115 if read_only == self.read_only: 116 return self 117 118 clone = type(self).__new__(type(self)) 119 clone.hamt = self.hamt 120 clone.encryption_key = self.encryption_key 121 clone.header = self.header 122 clone.metadata_read_cache = self.metadata_read_cache 123 clone._forced_read_only = read_only # safe; attribute is declared 124 zarr.abc.store.Store.__init__(clone, read_only=read_only) 125 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same ~py_hamt.hamt.HAMT
instance; no flushing, network traffic or async work is done.
143class ShardedZarrStore(zarr.abc.store.Store): 144 """ 145 Implements the Zarr Store API using a sharded layout for chunk CIDs. 146 147 This store divides the flat index of chunk CIDs into multiple "shards". 148 Each shard is a DAG-CBOR array where each element is either a CID link 149 to a chunk or a null value if the chunk is empty. This structure allows 150 for efficient traversal by IPLD-aware systems. 151 152 The store's root object contains: 153 1. A dictionary mapping metadata keys (like 'zarr.json') to their CIDs. 154 2. A list of CIDs, where each CID points to a shard object. 155 3. Sharding configuration details (e.g., chunks_per_shard). 156 """ 157 158 def __init__( 159 self, 160 cas: ContentAddressedStore, 161 read_only: bool, 162 root_cid: Optional[str] = None, 163 *, 164 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 165 ): 166 """Use the async `open()` classmethod to instantiate this class.""" 167 super().__init__(read_only=read_only) 168 self.cas = cas 169 self._root_cid = root_cid 170 self._root_obj: dict 171 172 self._resize_lock = asyncio.Lock() 173 # An event to signal when a resize is in-progress. 174 # It starts in the "set" state, allowing all operations to proceed. 175 self._resize_complete = asyncio.Event() 176 self._resize_complete.set() 177 self._shard_locks: DefaultDict[int, asyncio.Lock] = defaultdict(asyncio.Lock) 178 179 self._shard_data_cache = MemoryBoundedLRUCache(max_cache_memory_bytes) 180 self._pending_shard_loads: Dict[int, asyncio.Event] = {} 181 182 self._array_shape: Tuple[int, ...] 183 self._chunk_shape: Tuple[int, ...] 184 self._chunks_per_dim: Tuple[int, ...] 185 self._chunks_per_shard: int 186 self._num_shards: int = 0 187 self._total_chunks: int = 0 188 189 self._dirty_root = False 190 191 def __update_geometry(self): 192 """Calculates derived geometric properties from the base shapes.""" 193 194 if not all(cs > 0 for cs in self._chunk_shape): 195 raise ValueError("All chunk_shape dimensions must be positive.") 196 if not all(s >= 0 for s in self._array_shape): 197 raise ValueError("All array_shape dimensions must be non-negative.") 198 199 self._chunks_per_dim = tuple( 200 math.ceil(a / c) if c > 0 else 0 201 for a, c in zip(self._array_shape, self._chunk_shape) 202 ) 203 self._total_chunks = math.prod(self._chunks_per_dim) 204 205 if not self._total_chunks == 0: 206 self._num_shards = ( 207 self._total_chunks + self._chunks_per_shard - 1 208 ) // self._chunks_per_shard 209 210 @classmethod 211 async def open( 212 cls, 213 cas: ContentAddressedStore, 214 read_only: bool, 215 root_cid: Optional[str] = None, 216 *, 217 array_shape: Optional[Tuple[int, ...]] = None, 218 chunk_shape: Optional[Tuple[int, ...]] = None, 219 chunks_per_shard: Optional[int] = None, 220 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 221 ) -> "ShardedZarrStore": 222 """ 223 Asynchronously opens an existing ShardedZarrStore or initializes a new one. 224 """ 225 store = cls( 226 cas, read_only, root_cid, max_cache_memory_bytes=max_cache_memory_bytes 227 ) 228 if root_cid: 229 await store._load_root_from_cid() 230 elif not read_only: 231 if array_shape is None or chunk_shape is None: 232 raise ValueError( 233 "array_shape and chunk_shape must be provided for a new store." 234 ) 235 236 if not isinstance(chunks_per_shard, int) or chunks_per_shard <= 0: 237 raise ValueError("chunks_per_shard must be a positive integer.") 238 239 store._initialize_new_root(array_shape, chunk_shape, chunks_per_shard) 240 else: 241 raise ValueError("root_cid must be provided for a read-only store.") 242 return store 243 244 def _initialize_new_root( 245 self, 246 array_shape: Tuple[int, ...], 247 chunk_shape: Tuple[int, ...], 248 chunks_per_shard: int, 249 ): 250 self._array_shape = array_shape 251 self._chunk_shape = chunk_shape 252 self._chunks_per_shard = chunks_per_shard 253 254 self.__update_geometry() 255 256 self._root_obj = { 257 "manifest_version": "sharded_zarr_v1", 258 "metadata": {}, 259 "chunks": { 260 "array_shape": list(self._array_shape), 261 "chunk_shape": list(self._chunk_shape), 262 "sharding_config": { 263 "chunks_per_shard": self._chunks_per_shard, 264 }, 265 "shard_cids": [None] * self._num_shards, 266 }, 267 } 268 self._dirty_root = True 269 270 async def _load_root_from_cid(self): 271 root_bytes = await self.cas.load(self._root_cid) 272 try: 273 self._root_obj = dag_cbor.decode(root_bytes) 274 if not isinstance(self._root_obj, dict) or "chunks" not in self._root_obj: 275 raise ValueError( 276 "Root object is not a valid dictionary with 'chunks' key." 277 ) 278 if not isinstance(self._root_obj["chunks"]["shard_cids"], list): 279 raise ValueError("shard_cids is not a list.") 280 except Exception as e: 281 raise ValueError(f"Failed to decode root object: {e}") 282 283 if self._root_obj.get("manifest_version") != "sharded_zarr_v1": 284 raise ValueError( 285 f"Incompatible manifest version: {self._root_obj.get('manifest_version')}. Expected 'sharded_zarr_v1'." 286 ) 287 288 chunk_info = self._root_obj["chunks"] 289 self._array_shape = tuple(chunk_info["array_shape"]) 290 self._chunk_shape = tuple(chunk_info["chunk_shape"]) 291 self._chunks_per_shard = chunk_info["sharding_config"]["chunks_per_shard"] 292 293 self.__update_geometry() 294 295 if len(chunk_info["shard_cids"]) != self._num_shards: 296 raise ValueError( 297 f"Inconsistent number of shards. Expected {self._num_shards}, found {len(chunk_info['shard_cids'])}." 298 ) 299 300 async def _fetch_and_cache_full_shard( 301 self, 302 shard_idx: int, 303 shard_cid: str, 304 max_retries: int = 3, 305 retry_delay: float = 1.0, 306 ) -> None: 307 """ 308 Fetch a shard from CAS and cache it, with retry logic for transient errors. 309 310 Args: 311 shard_idx: The index of the shard to fetch. 312 shard_cid: The CID of the shard. 313 max_retries: Maximum number of retry attempts for transient errors. 314 retry_delay: Delay between retry attempts in seconds. 315 """ 316 for attempt in range(max_retries): 317 try: 318 shard_data_bytes = await self.cas.load(shard_cid) 319 decoded_shard = dag_cbor.decode(shard_data_bytes) 320 if not isinstance(decoded_shard, list): 321 raise TypeError(f"Shard {shard_idx} did not decode to a list.") 322 await self._shard_data_cache.put(shard_idx, decoded_shard) 323 # Always set the Event to unblock waiting coroutines 324 if shard_idx in self._pending_shard_loads: 325 self._pending_shard_loads[shard_idx].set() 326 del self._pending_shard_loads[shard_idx] 327 return # Success 328 except (ConnectionError, TimeoutError) as e: 329 # Handle transient errors (e.g., network issues) 330 if attempt < max_retries - 1: 331 await asyncio.sleep( 332 retry_delay * (2**attempt) 333 ) # Exponential backoff 334 continue 335 else: 336 raise RuntimeError( 337 f"Failed to fetch shard {shard_idx} after {max_retries} attempts: {e}" 338 ) 339 340 def _parse_chunk_key(self, key: str) -> Optional[Tuple[int, ...]]: 341 # 1. Exclude .json files immediately (metadata) 342 if key.endswith(".json"): 343 return None 344 excluded_array_prefixes = {"time", "lat", "lon", "latitude", "longitude"} 345 346 chunk_marker = "/c/" 347 marker_idx = key.rfind(chunk_marker) # Use rfind for robustness 348 if marker_idx == -1: 349 # Key does not contain "/c/", so it's not a chunk data key 350 # in the expected format (e.g., could be .zattrs, .zgroup at various levels). 351 return None 352 353 # Extract the part of the key before "/c/", which might represent the array/group path 354 # e.g., "temp" from "temp/c/0/0/0" 355 # e.g., "group1/lat" from "group1/lat/c/0" 356 # e.g., "" if key is "c/0/0/0" (root array) 357 path_before_c = key[:marker_idx] 358 359 # Determine the actual array name (the last component of the path before "/c/") 360 actual_array_name = "" 361 if path_before_c: 362 actual_array_name = path_before_c.split("/")[-1] 363 364 # 2. If the determined array name is in our exclusion list, return None. 365 if actual_array_name in excluded_array_prefixes: 366 return None 367 368 # The part after "/c/" contains the chunk coordinates 369 coord_part = key[marker_idx + len(chunk_marker) :] 370 parts = coord_part.split("/") 371 372 coords = tuple(map(int, parts)) 373 # Validate coordinates against the chunk grid of the store's configured array 374 for i, c_coord in enumerate(coords): 375 if not (0 <= c_coord < self._chunks_per_dim[i]): 376 raise IndexError( 377 f"Chunk coordinate {c_coord} at dimension {i} is out of bounds for dimension size {self._chunks_per_dim[i]}." 378 ) 379 return coords 380 381 def _get_linear_chunk_index(self, chunk_coords: Tuple[int, ...]) -> int: 382 linear_index = 0 383 multiplier = 1 384 # Convert N-D chunk coordinates to a flat 1-D index (row-major order) 385 for i in reversed(range(len(self._chunks_per_dim))): 386 linear_index += chunk_coords[i] * multiplier 387 multiplier *= self._chunks_per_dim[i] 388 return linear_index 389 390 def _get_shard_info(self, linear_chunk_index: int) -> Tuple[int, int]: 391 shard_idx = linear_chunk_index // self._chunks_per_shard 392 index_in_shard = linear_chunk_index % self._chunks_per_shard 393 return shard_idx, index_in_shard 394 395 async def _load_or_initialize_shard_cache( 396 self, shard_idx: int 397 ) -> List[Optional[CID]]: 398 """ 399 Load a shard into the cache or initialize an empty shard if it doesn't exist. 400 401 Args: 402 shard_idx: The index of the shard to load or initialize. 403 404 Returns: 405 List[Optional[CID]]: The shard data (list of CIDs or None). 406 407 Raises: 408 ValueError: If the shard index is out of bounds. 409 RuntimeError: If the shard cannot be loaded or initialized. 410 """ 411 cached_shard = await self._shard_data_cache.get(shard_idx) 412 if cached_shard is not None: 413 return cached_shard 414 415 if shard_idx in self._pending_shard_loads: 416 try: 417 # Wait for the pending load with a timeout (e.g., 60 seconds) 418 await asyncio.wait_for( 419 self._pending_shard_loads[shard_idx].wait(), timeout=60.0 420 ) 421 cached_shard = await self._shard_data_cache.get(shard_idx) 422 if cached_shard is not None: 423 return cached_shard 424 else: 425 raise RuntimeError( 426 f"Shard {shard_idx} not found in cache after pending load completed." 427 ) 428 except asyncio.TimeoutError: 429 # Clean up the pending load to allow retry 430 if shard_idx in self._pending_shard_loads: 431 self._pending_shard_loads[shard_idx].set() 432 del self._pending_shard_loads[shard_idx] 433 raise RuntimeError(f"Timeout waiting for shard {shard_idx} to load.") 434 435 if not (0 <= shard_idx < self._num_shards): 436 raise ValueError(f"Shard index {shard_idx} out of bounds.") 437 438 shard_cid_obj = self._root_obj["chunks"]["shard_cids"][shard_idx] 439 if shard_cid_obj: 440 self._pending_shard_loads[shard_idx] = asyncio.Event() 441 shard_cid_str = str(shard_cid_obj) 442 await self._fetch_and_cache_full_shard(shard_idx, shard_cid_str) 443 else: 444 empty_shard = [None] * self._chunks_per_shard 445 await self._shard_data_cache.put(shard_idx, empty_shard) 446 447 result = await self._shard_data_cache.get(shard_idx) 448 if result is None: 449 raise RuntimeError(f"Failed to load or initialize shard {shard_idx}") 450 return result # type: ignore[return-value] 451 452 async def set_partial_values( 453 self, key_start_values: Iterable[Tuple[str, int, BytesLike]] 454 ) -> None: 455 raise NotImplementedError( 456 "Partial writes are not supported by ShardedZarrStore." 457 ) 458 459 async def get_partial_values( 460 self, 461 prototype: zarr.core.buffer.BufferPrototype, 462 key_ranges: Iterable[Tuple[str, zarr.abc.store.ByteRequest | None]], 463 ) -> List[Optional[zarr.core.buffer.Buffer]]: 464 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 465 results = await asyncio.gather(*tasks) 466 return results 467 468 def with_read_only(self, read_only: bool = False) -> "ShardedZarrStore": 469 """ 470 Return this store (if the flag already matches) or a *shallow* 471 clone that presents the requested read‑only status. 472 473 The clone **shares** the same CAS instance and internal state; 474 no flushing, network traffic or async work is done. 475 """ 476 # Fast path 477 if read_only == self.read_only: 478 return self # Same mode, return same instance 479 480 # Create new instance with different read_only flag 481 # Creates a *bare* instance without running its __init__ 482 clone = type(self).__new__(type(self)) 483 484 # Copy all attributes from the current instance 485 clone.cas = self.cas 486 clone._root_cid = self._root_cid 487 clone._root_obj = self._root_obj 488 489 clone._resize_lock = self._resize_lock 490 clone._resize_complete = self._resize_complete 491 clone._shard_locks = self._shard_locks 492 493 clone._shard_data_cache = self._shard_data_cache 494 clone._pending_shard_loads = self._pending_shard_loads 495 496 clone._array_shape = self._array_shape 497 clone._chunk_shape = self._chunk_shape 498 clone._chunks_per_dim = self._chunks_per_dim 499 clone._chunks_per_shard = self._chunks_per_shard 500 clone._num_shards = self._num_shards 501 clone._total_chunks = self._total_chunks 502 503 clone._dirty_root = self._dirty_root 504 505 # Re‑initialise the zarr base class so that Zarr sees the flag 506 zarr.abc.store.Store.__init__(clone, read_only=read_only) 507 return clone 508 509 def __eq__(self, other: object) -> bool: 510 if not isinstance(other, ShardedZarrStore): 511 return False 512 # For equality, root CID is primary. Config like chunks_per_shard is part of that root's identity. 513 return self._root_cid == other._root_cid 514 515 # If nothing to flush, return the root CID. 516 async def flush(self) -> str: 517 async with self._shard_data_cache._cache_lock: 518 dirty_shards = list(self._shard_data_cache._dirty_shards) 519 if dirty_shards: 520 for shard_idx in sorted(dirty_shards): 521 # Get the list of CIDs/Nones from the cache 522 shard_data_list = await self._shard_data_cache.get(shard_idx) 523 if shard_data_list is None: 524 raise RuntimeError(f"Dirty shard {shard_idx} not found in cache") 525 526 # Encode this list into a DAG-CBOR byte representation 527 shard_data_bytes = dag_cbor.encode(shard_data_list) 528 529 # Save the DAG-CBOR block and get its CID 530 new_shard_cid_obj = await self.cas.save( 531 shard_data_bytes, 532 codec="dag-cbor", # Use 'dag-cbor' codec 533 ) 534 535 if ( 536 self._root_obj["chunks"]["shard_cids"][shard_idx] 537 != new_shard_cid_obj 538 ): 539 # Store the CID object directly 540 self._root_obj["chunks"]["shard_cids"][shard_idx] = ( 541 new_shard_cid_obj 542 ) 543 self._dirty_root = True 544 # Mark shard as clean after flushing 545 await self._shard_data_cache.mark_clean(shard_idx) 546 547 if self._dirty_root: 548 # Ensure all metadata CIDs are CID objects for correct encoding 549 self._root_obj["metadata"] = { 550 k: (CID.decode(v) if isinstance(v, str) else v) 551 for k, v in self._root_obj["metadata"].items() 552 } 553 root_obj_bytes = dag_cbor.encode(self._root_obj) 554 new_root_cid = await self.cas.save(root_obj_bytes, codec="dag-cbor") 555 self._root_cid = str(new_root_cid) 556 self._dirty_root = False 557 558 # Ignore because root_cid will always exist after initialization or flush. 559 return self._root_cid # type: ignore[return-value] 560 561 async def get( 562 self, 563 key: str, 564 prototype: zarr.core.buffer.BufferPrototype, 565 byte_range: Optional[zarr.abc.store.ByteRequest] = None, 566 ) -> Optional[zarr.core.buffer.Buffer]: 567 chunk_coords = self._parse_chunk_key(key) 568 # Metadata request 569 if chunk_coords is None: 570 metadata_cid_obj = self._root_obj["metadata"].get(key) 571 if metadata_cid_obj is None: 572 return None 573 if byte_range is not None: 574 raise ValueError( 575 "Byte range requests are not supported for metadata keys." 576 ) 577 data = await self.cas.load(str(metadata_cid_obj)) 578 return prototype.buffer.from_bytes(data) 579 # Chunk data request 580 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 581 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 582 583 # This will load the full shard into cache if it's not already there. 584 shard_lock = self._shard_locks[shard_idx] 585 async with shard_lock: 586 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 587 588 # Get the CID object (or None) from the cached list. 589 chunk_cid_obj = target_shard_list[index_in_shard] 590 591 if chunk_cid_obj is None: 592 return None # Chunk is empty/doesn't exist. 593 594 chunk_cid_str = str(chunk_cid_obj) 595 596 req_offset = None 597 req_length = None 598 req_suffix = None 599 600 if byte_range: 601 if isinstance(byte_range, RangeByteRequest): 602 req_offset = byte_range.start 603 if byte_range.end is not None: 604 if byte_range.start > byte_range.end: 605 raise ValueError( 606 f"Byte range start ({byte_range.start}) cannot be greater than end ({byte_range.end})" 607 ) 608 req_length = byte_range.end - byte_range.start 609 elif isinstance(byte_range, OffsetByteRequest): 610 req_offset = byte_range.offset 611 elif isinstance(byte_range, SuffixByteRequest): 612 req_suffix = byte_range.suffix 613 data = await self.cas.load( 614 chunk_cid_str, offset=req_offset, length=req_length, suffix=req_suffix 615 ) 616 return prototype.buffer.from_bytes(data) 617 618 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 619 if self.read_only: 620 raise PermissionError("Cannot write to a read-only store.") 621 await self._resize_complete.wait() 622 623 if ( 624 key.endswith("zarr.json") 625 and not key.startswith("time/") 626 and not key.startswith(("lat/", "latitude/")) 627 and not key.startswith(("lon/", "longitude/")) 628 and not key == "zarr.json" 629 ): 630 metadata_json = json.loads(value.to_bytes().decode("utf-8")) 631 new_array_shape = metadata_json.get("shape") 632 if not new_array_shape: 633 raise ValueError("Shape not found in metadata.") 634 if tuple(new_array_shape) != self._array_shape: 635 async with self._resize_lock: 636 # Double-check after acquiring the lock, in case another task 637 # just finished this exact resize while we were waiting. 638 if tuple(new_array_shape) != self._array_shape: 639 # Block all other tasks until resize is complete. 640 self._resize_complete.clear() 641 try: 642 await self.resize_store(new_shape=tuple(new_array_shape)) 643 finally: 644 # All waiting tasks will now un-pause and proceed safely. 645 self._resize_complete.set() 646 647 raw_data_bytes = value.to_bytes() 648 # Save the data to CAS first to get its CID. 649 # Metadata is often saved as 'raw', chunks as well unless compressed. 650 try: 651 data_cid_obj = await self.cas.save(raw_data_bytes, codec="raw") 652 await self.set_pointer(key, str(data_cid_obj)) 653 except Exception as e: 654 raise RuntimeError(f"Failed to save data for key {key}: {e}") 655 return None # type: ignore[return-value] 656 657 async def set_pointer(self, key: str, pointer: str) -> None: 658 chunk_coords = self._parse_chunk_key(key) 659 660 pointer_cid_obj = CID.decode(pointer) # Convert string to CID object 661 662 if chunk_coords is None: # Metadata key 663 self._root_obj["metadata"][key] = pointer_cid_obj 664 self._dirty_root = True 665 return None 666 667 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 668 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 669 670 shard_lock = self._shard_locks[shard_idx] 671 async with shard_lock: 672 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 673 674 if target_shard_list[index_in_shard] != pointer_cid_obj: 675 target_shard_list[index_in_shard] = pointer_cid_obj 676 await self._shard_data_cache.mark_dirty(shard_idx) 677 return None 678 679 async def exists(self, key: str) -> bool: 680 try: 681 chunk_coords = self._parse_chunk_key(key) 682 if chunk_coords is None: # Metadata 683 return key in self._root_obj.get("metadata", {}) 684 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 685 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 686 # Load shard if not cached and check the index 687 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 688 return target_shard_list[index_in_shard] is not None 689 except (ValueError, IndexError, KeyError): 690 return False 691 692 @property 693 def supports_writes(self) -> bool: 694 return not self.read_only 695 696 @property 697 def supports_partial_writes(self) -> bool: 698 return False # Each chunk CID is written atomically into a shard slot 699 700 @property 701 def supports_deletes(self) -> bool: 702 return not self.read_only 703 704 async def delete(self, key: str) -> None: 705 if self.read_only: 706 raise PermissionError("Cannot delete from a read-only store.") 707 708 chunk_coords = self._parse_chunk_key(key) 709 if chunk_coords is None: # Metadata 710 if self._root_obj["metadata"].pop(key, None): 711 self._dirty_root = True 712 else: 713 raise KeyError(f"Metadata key '{key}' not found.") 714 return None 715 716 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 717 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 718 719 shard_lock = self._shard_locks[shard_idx] 720 async with shard_lock: 721 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 722 if target_shard_list[index_in_shard] is not None: 723 target_shard_list[index_in_shard] = None 724 await self._shard_data_cache.mark_dirty(shard_idx) 725 726 @property 727 def supports_listing(self) -> bool: 728 return True 729 730 async def list(self) -> AsyncIterator[str]: 731 for key in list(self._root_obj.get("metadata", {})): 732 yield key 733 734 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 735 async for key in self.list(): 736 if key.startswith(prefix): 737 yield key 738 739 async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]): 740 if self.read_only: 741 raise PermissionError("Cannot graft onto a read-only store.") 742 743 store_to_graft = await ShardedZarrStore.open( 744 cas=self.cas, read_only=True, root_cid=store_to_graft_cid 745 ) 746 source_chunk_grid = store_to_graft._chunks_per_dim 747 for local_coords in itertools.product(*[range(s) for s in source_chunk_grid]): 748 linear_local_index = store_to_graft._get_linear_chunk_index(local_coords) 749 local_shard_idx, index_in_local_shard = store_to_graft._get_shard_info( 750 linear_local_index 751 ) 752 # Load the source shard into its cache 753 source_shard_list = await store_to_graft._load_or_initialize_shard_cache( 754 local_shard_idx 755 ) 756 757 pointer_cid_obj = source_shard_list[index_in_local_shard] 758 if pointer_cid_obj is None: 759 continue 760 761 # Calculate global coordinates and write to the main store's index 762 global_coords = tuple( 763 c_local + c_offset 764 for c_local, c_offset in zip(local_coords, chunk_offset) 765 ) 766 linear_global_index = self._get_linear_chunk_index(global_coords) 767 global_shard_idx, index_in_global_shard = self._get_shard_info( 768 linear_global_index 769 ) 770 771 shard_lock = self._shard_locks[global_shard_idx] 772 async with shard_lock: 773 target_shard_list = await self._load_or_initialize_shard_cache( 774 global_shard_idx 775 ) 776 if target_shard_list[index_in_global_shard] != pointer_cid_obj: 777 target_shard_list[index_in_global_shard] = pointer_cid_obj 778 await self._shard_data_cache.mark_dirty(global_shard_idx) 779 780 async def resize_store(self, new_shape: Tuple[int, ...]): 781 """ 782 Resizes the store's main shard index to accommodate a new overall array shape. 783 This is a metadata-only operation on the store's root object. 784 Used when doing skeleton writes or appends via xarray where the array shape changes. 785 """ 786 if self.read_only: 787 raise PermissionError("Cannot resize a read-only store.") 788 if ( 789 # self._root_obj is None 790 self._chunk_shape is None 791 or self._chunks_per_shard is None 792 or self._array_shape is None 793 ): 794 raise RuntimeError("Store is not properly initialized for resizing.") 795 if len(new_shape) != len(self._array_shape): 796 raise ValueError( 797 "New shape must have the same number of dimensions as the old shape." 798 ) 799 800 self._array_shape = tuple(new_shape) 801 self._chunks_per_dim = tuple( 802 math.ceil(a / c) if c > 0 else 0 803 for a, c in zip(self._array_shape, self._chunk_shape) 804 ) 805 self._total_chunks = math.prod(self._chunks_per_dim) 806 old_num_shards = self._num_shards if self._num_shards is not None else 0 807 self._num_shards = ( 808 (self._total_chunks + self._chunks_per_shard - 1) // self._chunks_per_shard 809 if self._total_chunks > 0 810 else 0 811 ) 812 self._root_obj["chunks"]["array_shape"] = list(self._array_shape) 813 if self._num_shards > old_num_shards: 814 self._root_obj["chunks"]["shard_cids"].extend( 815 [None] * (self._num_shards - old_num_shards) 816 ) 817 elif self._num_shards < old_num_shards: 818 self._root_obj["chunks"]["shard_cids"] = self._root_obj["chunks"][ 819 "shard_cids" 820 ][: self._num_shards] 821 822 self._dirty_root = True 823 824 async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]): 825 """ 826 Resizes the Zarr metadata for a specific variable (e.g., '.json' file). 827 This does NOT change the store's main shard index. 828 """ 829 if self.read_only: 830 raise PermissionError("Cannot resize a read-only store.") 831 832 zarr_metadata_key = f"{variable_name}/zarr.json" 833 834 old_zarr_metadata_cid = self._root_obj["metadata"].get(zarr_metadata_key) 835 if not old_zarr_metadata_cid: 836 raise KeyError( 837 f"Cannot find metadata for key '{zarr_metadata_key}' to resize." 838 ) 839 840 old_zarr_metadata_bytes = await self.cas.load(old_zarr_metadata_cid) 841 zarr_metadata_json = json.loads(old_zarr_metadata_bytes) 842 843 zarr_metadata_json["shape"] = list(new_shape) 844 845 new_zarr_metadata_bytes = json.dumps(zarr_metadata_json, indent=2).encode( 846 "utf-8" 847 ) 848 # Metadata is a raw blob of bytes 849 new_zarr_metadata_cid = await self.cas.save( 850 new_zarr_metadata_bytes, codec="raw" 851 ) 852 853 self._root_obj["metadata"][zarr_metadata_key] = new_zarr_metadata_cid 854 self._dirty_root = True 855 856 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 857 seen: Set[str] = set() 858 if prefix == "": 859 async for key in self.list(): # Iterates metadata keys 860 # e.g., if key is "group1/.zgroup" or "array1/.json", first_component is "group1" or "array1" 861 # if key is ".zgroup", first_component is ".zgroup" 862 first_component = key.split("/", 1)[0] 863 if first_component not in seen: 864 seen.add(first_component) 865 yield first_component 866 else: 867 raise NotImplementedError("Listing with a prefix is not implemented yet.")
Implements the Zarr Store API using a sharded layout for chunk CIDs.
This store divides the flat index of chunk CIDs into multiple "shards". Each shard is a DAG-CBOR array where each element is either a CID link to a chunk or a null value if the chunk is empty. This structure allows for efficient traversal by IPLD-aware systems.
The store's root object contains:
- A dictionary mapping metadata keys (like 'zarr.json') to their CIDs.
- A list of CIDs, where each CID points to a shard object.
- Sharding configuration details (e.g., chunks_per_shard).
158 def __init__( 159 self, 160 cas: ContentAddressedStore, 161 read_only: bool, 162 root_cid: Optional[str] = None, 163 *, 164 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 165 ): 166 """Use the async `open()` classmethod to instantiate this class.""" 167 super().__init__(read_only=read_only) 168 self.cas = cas 169 self._root_cid = root_cid 170 self._root_obj: dict 171 172 self._resize_lock = asyncio.Lock() 173 # An event to signal when a resize is in-progress. 174 # It starts in the "set" state, allowing all operations to proceed. 175 self._resize_complete = asyncio.Event() 176 self._resize_complete.set() 177 self._shard_locks: DefaultDict[int, asyncio.Lock] = defaultdict(asyncio.Lock) 178 179 self._shard_data_cache = MemoryBoundedLRUCache(max_cache_memory_bytes) 180 self._pending_shard_loads: Dict[int, asyncio.Event] = {} 181 182 self._array_shape: Tuple[int, ...] 183 self._chunk_shape: Tuple[int, ...] 184 self._chunks_per_dim: Tuple[int, ...] 185 self._chunks_per_shard: int 186 self._num_shards: int = 0 187 self._total_chunks: int = 0 188 189 self._dirty_root = False
Use the async open() classmethod to instantiate this class.
210 @classmethod 211 async def open( 212 cls, 213 cas: ContentAddressedStore, 214 read_only: bool, 215 root_cid: Optional[str] = None, 216 *, 217 array_shape: Optional[Tuple[int, ...]] = None, 218 chunk_shape: Optional[Tuple[int, ...]] = None, 219 chunks_per_shard: Optional[int] = None, 220 max_cache_memory_bytes: int = 100 * 1024 * 1024, # 100MB default 221 ) -> "ShardedZarrStore": 222 """ 223 Asynchronously opens an existing ShardedZarrStore or initializes a new one. 224 """ 225 store = cls( 226 cas, read_only, root_cid, max_cache_memory_bytes=max_cache_memory_bytes 227 ) 228 if root_cid: 229 await store._load_root_from_cid() 230 elif not read_only: 231 if array_shape is None or chunk_shape is None: 232 raise ValueError( 233 "array_shape and chunk_shape must be provided for a new store." 234 ) 235 236 if not isinstance(chunks_per_shard, int) or chunks_per_shard <= 0: 237 raise ValueError("chunks_per_shard must be a positive integer.") 238 239 store._initialize_new_root(array_shape, chunk_shape, chunks_per_shard) 240 else: 241 raise ValueError("root_cid must be provided for a read-only store.") 242 return store
Asynchronously opens an existing ShardedZarrStore or initializes a new one.
452 async def set_partial_values( 453 self, key_start_values: Iterable[Tuple[str, int, BytesLike]] 454 ) -> None: 455 raise NotImplementedError( 456 "Partial writes are not supported by ShardedZarrStore." 457 )
Store values at a given key, starting at byte range_start.
Parameters
key_start_values : list[tuple[str, int, BytesLike]] set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key
459 async def get_partial_values( 460 self, 461 prototype: zarr.core.buffer.BufferPrototype, 462 key_ranges: Iterable[Tuple[str, zarr.abc.store.ByteRequest | None]], 463 ) -> List[Optional[zarr.core.buffer.Buffer]]: 464 tasks = [self.get(key, prototype, byte_range) for key, byte_range in key_ranges] 465 results = await asyncio.gather(*tasks) 466 return results
Retrieve possibly partial values from given key_ranges.
Parameters
prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]] Ordered set of key, range pairs, a key may occur multiple times with different ranges
Returns
list of values, in the order of the key_ranges, may contain null/none for missing keys
468 def with_read_only(self, read_only: bool = False) -> "ShardedZarrStore": 469 """ 470 Return this store (if the flag already matches) or a *shallow* 471 clone that presents the requested read‑only status. 472 473 The clone **shares** the same CAS instance and internal state; 474 no flushing, network traffic or async work is done. 475 """ 476 # Fast path 477 if read_only == self.read_only: 478 return self # Same mode, return same instance 479 480 # Create new instance with different read_only flag 481 # Creates a *bare* instance without running its __init__ 482 clone = type(self).__new__(type(self)) 483 484 # Copy all attributes from the current instance 485 clone.cas = self.cas 486 clone._root_cid = self._root_cid 487 clone._root_obj = self._root_obj 488 489 clone._resize_lock = self._resize_lock 490 clone._resize_complete = self._resize_complete 491 clone._shard_locks = self._shard_locks 492 493 clone._shard_data_cache = self._shard_data_cache 494 clone._pending_shard_loads = self._pending_shard_loads 495 496 clone._array_shape = self._array_shape 497 clone._chunk_shape = self._chunk_shape 498 clone._chunks_per_dim = self._chunks_per_dim 499 clone._chunks_per_shard = self._chunks_per_shard 500 clone._num_shards = self._num_shards 501 clone._total_chunks = self._total_chunks 502 503 clone._dirty_root = self._dirty_root 504 505 # Re‑initialise the zarr base class so that Zarr sees the flag 506 zarr.abc.store.Store.__init__(clone, read_only=read_only) 507 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same CAS instance and internal state; no flushing, network traffic or async work is done.
516 async def flush(self) -> str: 517 async with self._shard_data_cache._cache_lock: 518 dirty_shards = list(self._shard_data_cache._dirty_shards) 519 if dirty_shards: 520 for shard_idx in sorted(dirty_shards): 521 # Get the list of CIDs/Nones from the cache 522 shard_data_list = await self._shard_data_cache.get(shard_idx) 523 if shard_data_list is None: 524 raise RuntimeError(f"Dirty shard {shard_idx} not found in cache") 525 526 # Encode this list into a DAG-CBOR byte representation 527 shard_data_bytes = dag_cbor.encode(shard_data_list) 528 529 # Save the DAG-CBOR block and get its CID 530 new_shard_cid_obj = await self.cas.save( 531 shard_data_bytes, 532 codec="dag-cbor", # Use 'dag-cbor' codec 533 ) 534 535 if ( 536 self._root_obj["chunks"]["shard_cids"][shard_idx] 537 != new_shard_cid_obj 538 ): 539 # Store the CID object directly 540 self._root_obj["chunks"]["shard_cids"][shard_idx] = ( 541 new_shard_cid_obj 542 ) 543 self._dirty_root = True 544 # Mark shard as clean after flushing 545 await self._shard_data_cache.mark_clean(shard_idx) 546 547 if self._dirty_root: 548 # Ensure all metadata CIDs are CID objects for correct encoding 549 self._root_obj["metadata"] = { 550 k: (CID.decode(v) if isinstance(v, str) else v) 551 for k, v in self._root_obj["metadata"].items() 552 } 553 root_obj_bytes = dag_cbor.encode(self._root_obj) 554 new_root_cid = await self.cas.save(root_obj_bytes, codec="dag-cbor") 555 self._root_cid = str(new_root_cid) 556 self._dirty_root = False 557 558 # Ignore because root_cid will always exist after initialization or flush. 559 return self._root_cid # type: ignore[return-value]
561 async def get( 562 self, 563 key: str, 564 prototype: zarr.core.buffer.BufferPrototype, 565 byte_range: Optional[zarr.abc.store.ByteRequest] = None, 566 ) -> Optional[zarr.core.buffer.Buffer]: 567 chunk_coords = self._parse_chunk_key(key) 568 # Metadata request 569 if chunk_coords is None: 570 metadata_cid_obj = self._root_obj["metadata"].get(key) 571 if metadata_cid_obj is None: 572 return None 573 if byte_range is not None: 574 raise ValueError( 575 "Byte range requests are not supported for metadata keys." 576 ) 577 data = await self.cas.load(str(metadata_cid_obj)) 578 return prototype.buffer.from_bytes(data) 579 # Chunk data request 580 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 581 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 582 583 # This will load the full shard into cache if it's not already there. 584 shard_lock = self._shard_locks[shard_idx] 585 async with shard_lock: 586 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 587 588 # Get the CID object (or None) from the cached list. 589 chunk_cid_obj = target_shard_list[index_in_shard] 590 591 if chunk_cid_obj is None: 592 return None # Chunk is empty/doesn't exist. 593 594 chunk_cid_str = str(chunk_cid_obj) 595 596 req_offset = None 597 req_length = None 598 req_suffix = None 599 600 if byte_range: 601 if isinstance(byte_range, RangeByteRequest): 602 req_offset = byte_range.start 603 if byte_range.end is not None: 604 if byte_range.start > byte_range.end: 605 raise ValueError( 606 f"Byte range start ({byte_range.start}) cannot be greater than end ({byte_range.end})" 607 ) 608 req_length = byte_range.end - byte_range.start 609 elif isinstance(byte_range, OffsetByteRequest): 610 req_offset = byte_range.offset 611 elif isinstance(byte_range, SuffixByteRequest): 612 req_suffix = byte_range.suffix 613 data = await self.cas.load( 614 chunk_cid_str, offset=req_offset, length=req_length, suffix=req_suffix 615 ) 616 return prototype.buffer.from_bytes(data)
Retrieve the value associated with a given key.
Parameters
key : str prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. byte_range : ByteRequest, optional ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.
Returns
Buffer
618 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 619 if self.read_only: 620 raise PermissionError("Cannot write to a read-only store.") 621 await self._resize_complete.wait() 622 623 if ( 624 key.endswith("zarr.json") 625 and not key.startswith("time/") 626 and not key.startswith(("lat/", "latitude/")) 627 and not key.startswith(("lon/", "longitude/")) 628 and not key == "zarr.json" 629 ): 630 metadata_json = json.loads(value.to_bytes().decode("utf-8")) 631 new_array_shape = metadata_json.get("shape") 632 if not new_array_shape: 633 raise ValueError("Shape not found in metadata.") 634 if tuple(new_array_shape) != self._array_shape: 635 async with self._resize_lock: 636 # Double-check after acquiring the lock, in case another task 637 # just finished this exact resize while we were waiting. 638 if tuple(new_array_shape) != self._array_shape: 639 # Block all other tasks until resize is complete. 640 self._resize_complete.clear() 641 try: 642 await self.resize_store(new_shape=tuple(new_array_shape)) 643 finally: 644 # All waiting tasks will now un-pause and proceed safely. 645 self._resize_complete.set() 646 647 raw_data_bytes = value.to_bytes() 648 # Save the data to CAS first to get its CID. 649 # Metadata is often saved as 'raw', chunks as well unless compressed. 650 try: 651 data_cid_obj = await self.cas.save(raw_data_bytes, codec="raw") 652 await self.set_pointer(key, str(data_cid_obj)) 653 except Exception as e: 654 raise RuntimeError(f"Failed to save data for key {key}: {e}") 655 return None # type: ignore[return-value]
Store a (key, value) pair.
Parameters
key : str value : Buffer
657 async def set_pointer(self, key: str, pointer: str) -> None: 658 chunk_coords = self._parse_chunk_key(key) 659 660 pointer_cid_obj = CID.decode(pointer) # Convert string to CID object 661 662 if chunk_coords is None: # Metadata key 663 self._root_obj["metadata"][key] = pointer_cid_obj 664 self._dirty_root = True 665 return None 666 667 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 668 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 669 670 shard_lock = self._shard_locks[shard_idx] 671 async with shard_lock: 672 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 673 674 if target_shard_list[index_in_shard] != pointer_cid_obj: 675 target_shard_list[index_in_shard] = pointer_cid_obj 676 await self._shard_data_cache.mark_dirty(shard_idx) 677 return None
679 async def exists(self, key: str) -> bool: 680 try: 681 chunk_coords = self._parse_chunk_key(key) 682 if chunk_coords is None: # Metadata 683 return key in self._root_obj.get("metadata", {}) 684 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 685 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 686 # Load shard if not cached and check the index 687 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 688 return target_shard_list[index_in_shard] is not None 689 except (ValueError, IndexError, KeyError): 690 return False
Check if a key exists in the store.
Parameters
key : str
Returns
bool
696 @property 697 def supports_partial_writes(self) -> bool: 698 return False # Each chunk CID is written atomically into a shard slot
Does the store support partial writes?
704 async def delete(self, key: str) -> None: 705 if self.read_only: 706 raise PermissionError("Cannot delete from a read-only store.") 707 708 chunk_coords = self._parse_chunk_key(key) 709 if chunk_coords is None: # Metadata 710 if self._root_obj["metadata"].pop(key, None): 711 self._dirty_root = True 712 else: 713 raise KeyError(f"Metadata key '{key}' not found.") 714 return None 715 716 linear_chunk_index = self._get_linear_chunk_index(chunk_coords) 717 shard_idx, index_in_shard = self._get_shard_info(linear_chunk_index) 718 719 shard_lock = self._shard_locks[shard_idx] 720 async with shard_lock: 721 target_shard_list = await self._load_or_initialize_shard_cache(shard_idx) 722 if target_shard_list[index_in_shard] is not None: 723 target_shard_list[index_in_shard] = None 724 await self._shard_data_cache.mark_dirty(shard_idx)
Remove a key from the store
Parameters
key : str
730 async def list(self) -> AsyncIterator[str]: 731 for key in list(self._root_obj.get("metadata", {})): 732 yield key
Retrieve all keys in the store.
Returns
AsyncIterator[str]
734 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 735 async for key in self.list(): 736 if key.startswith(prefix): 737 yield key
Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.
Parameters
prefix : str
Returns
AsyncIterator[str]
739 async def graft_store(self, store_to_graft_cid: str, chunk_offset: Tuple[int, ...]): 740 if self.read_only: 741 raise PermissionError("Cannot graft onto a read-only store.") 742 743 store_to_graft = await ShardedZarrStore.open( 744 cas=self.cas, read_only=True, root_cid=store_to_graft_cid 745 ) 746 source_chunk_grid = store_to_graft._chunks_per_dim 747 for local_coords in itertools.product(*[range(s) for s in source_chunk_grid]): 748 linear_local_index = store_to_graft._get_linear_chunk_index(local_coords) 749 local_shard_idx, index_in_local_shard = store_to_graft._get_shard_info( 750 linear_local_index 751 ) 752 # Load the source shard into its cache 753 source_shard_list = await store_to_graft._load_or_initialize_shard_cache( 754 local_shard_idx 755 ) 756 757 pointer_cid_obj = source_shard_list[index_in_local_shard] 758 if pointer_cid_obj is None: 759 continue 760 761 # Calculate global coordinates and write to the main store's index 762 global_coords = tuple( 763 c_local + c_offset 764 for c_local, c_offset in zip(local_coords, chunk_offset) 765 ) 766 linear_global_index = self._get_linear_chunk_index(global_coords) 767 global_shard_idx, index_in_global_shard = self._get_shard_info( 768 linear_global_index 769 ) 770 771 shard_lock = self._shard_locks[global_shard_idx] 772 async with shard_lock: 773 target_shard_list = await self._load_or_initialize_shard_cache( 774 global_shard_idx 775 ) 776 if target_shard_list[index_in_global_shard] != pointer_cid_obj: 777 target_shard_list[index_in_global_shard] = pointer_cid_obj 778 await self._shard_data_cache.mark_dirty(global_shard_idx)
780 async def resize_store(self, new_shape: Tuple[int, ...]): 781 """ 782 Resizes the store's main shard index to accommodate a new overall array shape. 783 This is a metadata-only operation on the store's root object. 784 Used when doing skeleton writes or appends via xarray where the array shape changes. 785 """ 786 if self.read_only: 787 raise PermissionError("Cannot resize a read-only store.") 788 if ( 789 # self._root_obj is None 790 self._chunk_shape is None 791 or self._chunks_per_shard is None 792 or self._array_shape is None 793 ): 794 raise RuntimeError("Store is not properly initialized for resizing.") 795 if len(new_shape) != len(self._array_shape): 796 raise ValueError( 797 "New shape must have the same number of dimensions as the old shape." 798 ) 799 800 self._array_shape = tuple(new_shape) 801 self._chunks_per_dim = tuple( 802 math.ceil(a / c) if c > 0 else 0 803 for a, c in zip(self._array_shape, self._chunk_shape) 804 ) 805 self._total_chunks = math.prod(self._chunks_per_dim) 806 old_num_shards = self._num_shards if self._num_shards is not None else 0 807 self._num_shards = ( 808 (self._total_chunks + self._chunks_per_shard - 1) // self._chunks_per_shard 809 if self._total_chunks > 0 810 else 0 811 ) 812 self._root_obj["chunks"]["array_shape"] = list(self._array_shape) 813 if self._num_shards > old_num_shards: 814 self._root_obj["chunks"]["shard_cids"].extend( 815 [None] * (self._num_shards - old_num_shards) 816 ) 817 elif self._num_shards < old_num_shards: 818 self._root_obj["chunks"]["shard_cids"] = self._root_obj["chunks"][ 819 "shard_cids" 820 ][: self._num_shards] 821 822 self._dirty_root = True
Resizes the store's main shard index to accommodate a new overall array shape. This is a metadata-only operation on the store's root object. Used when doing skeleton writes or appends via xarray where the array shape changes.
824 async def resize_variable(self, variable_name: str, new_shape: Tuple[int, ...]): 825 """ 826 Resizes the Zarr metadata for a specific variable (e.g., '.json' file). 827 This does NOT change the store's main shard index. 828 """ 829 if self.read_only: 830 raise PermissionError("Cannot resize a read-only store.") 831 832 zarr_metadata_key = f"{variable_name}/zarr.json" 833 834 old_zarr_metadata_cid = self._root_obj["metadata"].get(zarr_metadata_key) 835 if not old_zarr_metadata_cid: 836 raise KeyError( 837 f"Cannot find metadata for key '{zarr_metadata_key}' to resize." 838 ) 839 840 old_zarr_metadata_bytes = await self.cas.load(old_zarr_metadata_cid) 841 zarr_metadata_json = json.loads(old_zarr_metadata_bytes) 842 843 zarr_metadata_json["shape"] = list(new_shape) 844 845 new_zarr_metadata_bytes = json.dumps(zarr_metadata_json, indent=2).encode( 846 "utf-8" 847 ) 848 # Metadata is a raw blob of bytes 849 new_zarr_metadata_cid = await self.cas.save( 850 new_zarr_metadata_bytes, codec="raw" 851 ) 852 853 self._root_obj["metadata"][zarr_metadata_key] = new_zarr_metadata_cid 854 self._dirty_root = True
Resizes the Zarr metadata for a specific variable (e.g., '.json' file). This does NOT change the store's main shard index.
856 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 857 seen: Set[str] = set() 858 if prefix == "": 859 async for key in self.list(): # Iterates metadata keys 860 # e.g., if key is "group1/.zgroup" or "array1/.json", first_component is "group1" or "array1" 861 # if key is ".zgroup", first_component is ".zgroup" 862 first_component = key.split("/", 1)[0] 863 if first_component not in seen: 864 seen.add(first_component) 865 yield first_component 866 else: 867 raise NotImplementedError("Listing with a prefix is not implemented yet.")
Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.
Parameters
prefix : str
Returns
AsyncIterator[str]
15async def convert_hamt_to_sharded( 16 cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int 17) -> str: 18 """ 19 Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore. 20 21 Args: 22 cas: An initialized ContentAddressedStore instance (KuboCAS). 23 hamt_root_cid: The root CID of the source ZarrHAMTStore. 24 chunks_per_shard: The number of chunks to group into a single shard in the new store. 25 26 Returns: 27 The root CID of the newly created ShardedZarrStore. 28 """ 29 print(f"--- Starting Conversion from HAMT Root {hamt_root_cid} ---") 30 start_time = time.perf_counter() 31 # 1. Open the source HAMT store for reading 32 print("Opening source HAMT store...") 33 hamt_ro = await HAMT.build( 34 cas=cas, root_node_id=hamt_root_cid, values_are_bytes=True, read_only=True 35 ) 36 source_store = ZarrHAMTStore(hamt_ro, read_only=True) 37 source_dataset = xr.open_zarr(store=source_store, consolidated=True) 38 # 2. Introspect the source array to get its configuration 39 print("Reading metadata from source store...") 40 41 # Read the stores metadata to get array shape and chunk shape 42 data_var_name = next(iter(source_dataset.data_vars)) 43 ordered_dims = list(source_dataset[data_var_name].dims) 44 array_shape_tuple = tuple(source_dataset.sizes[dim] for dim in ordered_dims) 45 chunk_shape_tuple = tuple(source_dataset.chunks[dim][0] for dim in ordered_dims) 46 array_shape = array_shape_tuple 47 chunk_shape = chunk_shape_tuple 48 49 # 3. Create the destination ShardedZarrStore for writing 50 print( 51 f"Initializing new ShardedZarrStore with {chunks_per_shard} chunks per shard..." 52 ) 53 dest_store = await ShardedZarrStore.open( 54 cas=cas, 55 read_only=False, 56 array_shape=array_shape, 57 chunk_shape=chunk_shape, 58 chunks_per_shard=chunks_per_shard, 59 ) 60 61 print("Destination store initialized.") 62 63 # 4. Iterate and copy all data from source to destination 64 print("Starting data migration...") 65 count = 0 66 async for key in hamt_ro.keys(): 67 count += 1 68 # Read the raw data (metadata or chunk) from the source 69 cid: CID = await hamt_ro.get_pointer(key) 70 cid_base32_str = str(cid.encode("base32")) 71 72 # Write the exact same key-value pair to the destination. 73 await dest_store.set_pointer(key, cid_base32_str) 74 if count % 200 == 0: # pragma: no cover 75 print(f"Migrated {count} keys...") # pragma: no cover 76 77 print(f"Migration of {count} total keys complete.") 78 79 # 5. Finalize the new store by flushing it to the CAS 80 print("Flushing new store to get final root CID...") 81 new_root_cid = await dest_store.flush() 82 end_time = time.perf_counter() 83 84 print("\n--- Conversion Complete! ---") 85 print(f"Total time: {end_time - start_time:.2f} seconds") 86 print(f"New ShardedZarrStore Root CID: {new_root_cid}") 87 return new_root_cid
Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore.
Args: cas: An initialized ContentAddressedStore instance (KuboCAS). hamt_root_cid: The root CID of the source ZarrHAMTStore. chunks_per_shard: The number of chunks to group into a single shard in the new store.
Returns: The root CID of the newly created ShardedZarrStore.