py_hamt
1from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore 2from .hamt import HAMT, blake3_hashfn 3from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS 4from .zarr_hamt_store import ZarrHAMTStore 5 6__all__ = [ 7 "blake3_hashfn", 8 "HAMT", 9 "ContentAddressedStore", 10 "InMemoryCAS", 11 "KuboCAS", 12 "ZarrHAMTStore", 13 "SimpleEncryptedZarrHAMTStore", 14] 15 16print("Running py-hamt from source!")
52def blake3_hashfn(input_bytes: bytes) -> bytes: 53 """ 54 This is the default blake3 hash function used for the `HAMT`, with a 32 byte hash size. 55 56 """ 57 # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify 58 digest: bytes = b3.digest(input_bytes, size=32) 59 raw_bytes: bytes = b3.unwrap(digest) 60 return raw_bytes
This is the default blake3 hash function used for the HAMT
, with a 32 byte hash size.
288class HAMT: 289 """ 290 An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model. 291 292 Use this to store arbitrarily large key-value mappings in your CAS of choice. 293 294 For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread. 295 296 When in read-only mode, the HAMT is both async and thread safe. 297 298 #### A note about memory management, read+write and read-only modes 299 The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations. 300 301 Note that in read+write, the real root node id IS NOT VALID. You should call `make_read_only()` to convert to read only mode and then read `root_node_id`. 302 303 These optimizations also trade off performance for memory use. Use `cache_size` to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use `cache_vacate` if you are over your memory limits. 304 305 #### IPFS HAMT Sample Code 306 ```python 307 kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints 308 hamt = await HAMT.build(cas=kubo_cas) 309 await hamt.set("foo", "bar") 310 assert (await hamt.get("foo")) == "bar" 311 await hamt.make_read_only() 312 cid = hamt.root_node_id # our root node CID 313 print(cid) 314 ``` 315 """ 316 317 def __init__( 318 self, 319 cas: ContentAddressedStore, 320 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 321 root_node_id: IPLDKind | None = None, 322 read_only: bool = False, 323 max_bucket_size: int = 4, 324 values_are_bytes: bool = False, 325 ): 326 """ 327 Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below. 328 """ 329 330 self.cas: ContentAddressedStore = cas 331 """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS.""" 332 333 self.hash_fn: Callable[[bytes], bytes] = hash_fn 334 """ 335 This is the hash function used to place a key-value within the HAMT. 336 337 To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes. 338 339 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 340 341 Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors. 342 """ 343 344 self.lock: asyncio.Lock = asyncio.Lock() 345 """@private""" 346 347 self.values_are_bytes: bool = values_are_bytes 348 """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind. 349 350 This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk. 351 """ 352 353 if max_bucket_size < 1: 354 raise ValueError("Bucket size maximum must be a positive integer") 355 self.max_bucket_size: int = max_bucket_size 356 """ 357 This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided. 358 359 This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS. 360 361 This must be a positive integer with a minimum of 1. 362 """ 363 364 self.root_node_id: IPLDKind = root_node_id 365 """ 366 This is type IPLDKind but the documentation generator pdoc mangles it a bit. 367 368 Read from this only when in read mode to get something valid! 369 """ 370 371 self.read_only: bool = read_only 372 """Clients should NOT modify this. 373 374 This is here for checking whether the HAMT is in read only or read/write mode. 375 376 The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS. 377 """ 378 self.node_store: NodeStore 379 """@private""" 380 if read_only: 381 self.node_store = ReadCacheStore(self) 382 else: 383 self.node_store = InMemoryTreeStore(self) 384 385 @classmethod 386 async def build(cls, *args: Any, **kwargs: Any) -> "HAMT": 387 """ 388 Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__. 389 390 This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided. 391 """ 392 hamt = cls(*args, **kwargs) 393 if hamt.root_node_id is None: 394 hamt.root_node_id = await hamt.node_store.save(None, Node()) 395 return hamt 396 397 # This is typically a massive blocking operation, you dont want to be running this concurrently with a bunch of other operations, so it's ok to have it not be async 398 async def make_read_only(self) -> None: 399 """ 400 Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID. 401 402 In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between. 403 """ 404 async with self.lock: 405 inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store) 406 await inmemory_tree.vacate() 407 408 self.read_only = True 409 self.node_store = ReadCacheStore(self) 410 411 async def enable_write(self) -> None: 412 """ 413 Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`. 414 """ 415 async with self.lock: 416 # The read cache has no writes that need to be sent upstream so we can remove it without vacating 417 self.read_only = False 418 self.node_store = InMemoryTreeStore(self) 419 420 async def cache_size(self) -> int: 421 """ 422 Returns the memory used by some internal performance optimization tools in bytes. 423 424 This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however. 425 426 Be warned that this may take a while to run for large HAMTs. 427 428 For more on memory management, see the `HAMT` class documentation. 429 """ 430 if self.read_only: 431 return self.node_store.size() 432 async with self.lock: 433 return self.node_store.size() 434 435 async def cache_vacate(self) -> None: 436 """ 437 Vacate and completely empty out the internal read/write cache. 438 439 Be warned that this may take a while if there have been a lot of write operations. 440 441 For more on memory management, see the `HAMT` class documentation. 442 """ 443 if self.read_only: 444 await self.node_store.vacate() 445 else: 446 async with self.lock: 447 await self.node_store.vacate() 448 449 async def _reserialize_and_link( 450 self, node_stack: list[tuple[IPLDKind, Node]] 451 ) -> None: 452 """ 453 This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store 454 Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack 455 Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python 456 If a node ends up being empty, then it is deleted entirely, unless it is the root node 457 Modifies in place 458 """ 459 # iterate in the reverse direction, this range goes from n-1 to 0, from the bottommost tree node to the root 460 for stack_index in range(len(node_stack) - 1, -1, -1): 461 old_id, node = node_stack[stack_index] 462 463 # If this node is empty, and it's not the root node, then we can delete it entirely from the list 464 is_root: bool = stack_index == 0 465 if node.is_empty() and not is_root: 466 # Unlink from the rest of the tree 467 _, prev_node = node_stack[stack_index - 1] 468 # When removing links, don't worry about two nodes having the same link since all nodes are guaranteed to be different by the removal of empty nodes after every single operation 469 for link_index in prev_node.iter_link_indices(): 470 link = prev_node.get_link(link_index) 471 if link == old_id: 472 # Delete the link by making it an empty bucket 473 prev_node.data[link_index] = {} 474 break 475 476 # Remove from our stack, continue reserializing up the tree 477 node_stack.pop(stack_index) 478 continue 479 480 # If not an empty node, just reserialize like normal and replace this one 481 new_store_id: IPLDKind = await self.node_store.save(old_id, node) 482 node_stack[stack_index] = (new_store_id, node) 483 484 # If this is not the last i.e. root node, we need to change the linking of the node prior in the list since we just reserialized 485 if not is_root: 486 _, prev_node = node_stack[stack_index - 1] 487 prev_node.replace_link(old_id, new_store_id) 488 489 # automatically skip encoding if the value provided is of the bytes variety 490 async def set(self, key: str, val: IPLDKind) -> None: 491 """Write a key-value mapping.""" 492 if self.read_only: 493 raise Exception("Cannot call set on a read only HAMT") 494 495 data: bytes 496 if self.values_are_bytes: 497 data = cast( 498 bytes, val 499 ) # let users get an exception if they pass in a non bytes when they want to skip encoding 500 else: 501 data = dag_cbor.encode(val) 502 503 pointer: IPLDKind = await self.cas.save(data, codec="raw") 504 await self._set_pointer(key, pointer) 505 506 async def _set_pointer(self, key: str, val_ptr: IPLDKind) -> None: 507 async with self.lock: 508 node_stack: list[tuple[IPLDKind, Node]] = [] 509 root_node: Node = await self.node_store.load(self.root_node_id) 510 node_stack.append((self.root_node_id, root_node)) 511 512 # FIFO queue to keep track of all the KVs we need to insert 513 # This is needed if any buckets overflow and so we need to reinsert all those KVs 514 kvs_queue: list[tuple[str, IPLDKind]] = [] 515 kvs_queue.append((key, val_ptr)) 516 517 while len(kvs_queue) > 0: 518 _, top_node = node_stack[-1] 519 curr_key, curr_val_ptr = kvs_queue[0] 520 521 raw_hash: bytes = self.hash_fn(curr_key.encode()) 522 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 523 524 item = top_node.data[map_key] 525 if isinstance(item, list): 526 next_node_id: IPLDKind = item[0] 527 next_node: Node = await self.node_store.load(next_node_id) 528 node_stack.append((next_node_id, next_node)) 529 elif isinstance(item, dict): 530 bucket: dict[str, IPLDKind] = item 531 532 # If this bucket already has this same key, or has space, just rewrite the value and then go work on the others in the queue 533 if curr_key in bucket or len(bucket) < self.max_bucket_size: 534 bucket[curr_key] = curr_val_ptr 535 kvs_queue.pop(0) 536 continue 537 538 # The current key is not in the bucket and the bucket is too full, so empty KVs from the bucket and restart insertion 539 for k in bucket: 540 v_ptr = bucket[k] 541 kvs_queue.append((k, v_ptr)) 542 543 # Create a new link to a new node so that we can reflow these KVs into a new subtree 544 new_node = Node() 545 new_node_id: IPLDKind = await self.node_store.save(None, new_node) 546 link: list[IPLDKind] = [new_node_id] 547 top_node.data[map_key] = link 548 549 # Finally, reserialize and fix all links, deleting empty nodes as needed 550 await self._reserialize_and_link(node_stack) 551 self.root_node_id = node_stack[0][0] 552 553 async def delete(self, key: str) -> None: 554 """Delete a key-value mapping.""" 555 556 # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo 557 if self.read_only: 558 raise Exception("Cannot call delete on a read only HAMT") 559 560 async with self.lock: 561 raw_hash: bytes = self.hash_fn(key.encode()) 562 563 node_stack: list[tuple[IPLDKind, Node]] = [] 564 root_node: Node = await self.node_store.load(self.root_node_id) 565 node_stack.append((self.root_node_id, root_node)) 566 567 created_change: bool = False 568 while True: 569 _, top_node = node_stack[-1] 570 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 571 572 item = top_node.data[map_key] 573 if isinstance(item, dict): 574 bucket = item 575 if key in bucket: 576 del bucket[key] 577 created_change = True 578 # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError 579 break 580 elif isinstance(item, list): 581 link: IPLDKind = item[0] 582 next_node: Node = await self.node_store.load(link) 583 node_stack.append((link, next_node)) 584 585 # Finally, reserialize and fix all links, deleting empty nodes as needed 586 if created_change: 587 await self._reserialize_and_link(node_stack) 588 self.root_node_id = node_stack[0][0] 589 else: 590 # If we didn't make a change, then this key must not exist within the HAMT 591 raise KeyError 592 593 async def get(self, key: str) -> IPLDKind: 594 """Get a value.""" 595 pointer: IPLDKind = await self.get_pointer(key) 596 data: bytes = await self.cas.load(pointer) 597 if self.values_are_bytes: 598 return data 599 else: 600 return dag_cbor.decode(data) 601 602 async def get_pointer(self, key: str) -> IPLDKind: 603 """ 604 Get a store ID that points to the value for this key. 605 606 This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example. 607 """ 608 # If read only, no need to acquire a lock 609 pointer: IPLDKind 610 if self.read_only: 611 pointer = await self._get_pointer(key) 612 else: 613 async with self.lock: 614 pointer = await self._get_pointer(key) 615 616 return pointer 617 618 # Callers MUST handle acquiring a lock 619 async def _get_pointer(self, key: str) -> IPLDKind: 620 raw_hash: bytes = self.hash_fn(key.encode()) 621 622 current_id: IPLDKind = self.root_node_id 623 current_depth: int = 0 624 625 # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value of IPLDKind 626 result_ptr: IPLDKind = None 627 found_a_result: bool = False 628 while True: 629 top_id: IPLDKind = current_id 630 top_node: Node = await self.node_store.load(top_id) 631 map_key: int = extract_bits(raw_hash, current_depth, 8) 632 633 # Check if this key is in one of the buckets 634 item = top_node.data[map_key] 635 if isinstance(item, dict): 636 bucket = item 637 if key in bucket: 638 result_ptr = bucket[key] 639 found_a_result = True 640 break 641 642 if isinstance(item, list): 643 link: IPLDKind = item[0] 644 current_id = link 645 current_depth += 1 646 continue 647 648 # Nowhere left to go, stop walking down the tree 649 break 650 651 if not found_a_result: 652 raise KeyError 653 654 return result_ptr 655 656 # Callers MUST handle locking or not on their own 657 async def _iter_nodes(self) -> AsyncIterator[tuple[IPLDKind, Node]]: 658 node_id_stack: list[IPLDKind] = [self.root_node_id] 659 while len(node_id_stack) > 0: 660 top_id: IPLDKind = node_id_stack.pop() 661 node: Node = await self.node_store.load(top_id) 662 yield (top_id, node) 663 node_id_stack.extend(list(node.iter_links())) 664 665 async def keys(self) -> AsyncIterator[str]: 666 """ 667 AsyncIterator returning all keys in the HAMT. 668 669 If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed. 670 671 When the HAMT is in read only mode however, this can be run concurrently with get operations. 672 """ 673 if self.read_only: 674 async for k in self._keys_no_locking(): 675 yield k 676 else: 677 async with self.lock: 678 async for k in self._keys_no_locking(): 679 yield k 680 681 async def _keys_no_locking(self) -> AsyncIterator[str]: 682 async for _, node in self._iter_nodes(): 683 for bucket in node.iter_buckets(): 684 for key in bucket: 685 yield key 686 687 async def len(self) -> int: 688 """ 689 Return the number of key value mappings in this HAMT. 690 691 When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations. 692 """ 693 count: int = 0 694 async for _ in self.keys(): 695 count += 1 696 697 return count
An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.
Use this to store arbitrarily large key-value mappings in your CAS of choice.
For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.
When in read-only mode, the HAMT is both async and thread safe.
A note about memory management, read+write and read-only modes
The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.
Note that in read+write, the real root node id IS NOT VALID. You should call make_read_only()
to convert to read only mode and then read root_node_id
.
These optimizations also trade off performance for memory use. Use cache_size
to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use cache_vacate
if you are over your memory limits.
IPFS HAMT Sample Code
kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
hamt = await HAMT.build(cas=kubo_cas)
await hamt.set("foo", "bar")
assert (await hamt.get("foo")) == "bar"
await hamt.make_read_only()
cid = hamt.root_node_id # our root node CID
print(cid)
317 def __init__( 318 self, 319 cas: ContentAddressedStore, 320 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 321 root_node_id: IPLDKind | None = None, 322 read_only: bool = False, 323 max_bucket_size: int = 4, 324 values_are_bytes: bool = False, 325 ): 326 """ 327 Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below. 328 """ 329 330 self.cas: ContentAddressedStore = cas 331 """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS.""" 332 333 self.hash_fn: Callable[[bytes], bytes] = hash_fn 334 """ 335 This is the hash function used to place a key-value within the HAMT. 336 337 To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes. 338 339 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 340 341 Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors. 342 """ 343 344 self.lock: asyncio.Lock = asyncio.Lock() 345 """@private""" 346 347 self.values_are_bytes: bool = values_are_bytes 348 """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind. 349 350 This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk. 351 """ 352 353 if max_bucket_size < 1: 354 raise ValueError("Bucket size maximum must be a positive integer") 355 self.max_bucket_size: int = max_bucket_size 356 """ 357 This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided. 358 359 This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS. 360 361 This must be a positive integer with a minimum of 1. 362 """ 363 364 self.root_node_id: IPLDKind = root_node_id 365 """ 366 This is type IPLDKind but the documentation generator pdoc mangles it a bit. 367 368 Read from this only when in read mode to get something valid! 369 """ 370 371 self.read_only: bool = read_only 372 """Clients should NOT modify this. 373 374 This is here for checking whether the HAMT is in read only or read/write mode. 375 376 The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS. 377 """ 378 self.node_store: NodeStore 379 """@private""" 380 if read_only: 381 self.node_store = ReadCacheStore(self) 382 else: 383 self.node_store = InMemoryTreeStore(self)
Use build
if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
The backing storage system. py-hamt provides an implementation KuboCAS
for IPFS.
This is the hash function used to place a key-value within the HAMT.
To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
This must be a positive integer with a minimum of 1.
This is type IPLDKind but the documentation generator pdoc mangles it a bit.
Read from this only when in read mode to get something valid!
Clients should NOT modify this.
This is here for checking whether the HAMT is in read only or read/write mode.
The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
385 @classmethod 386 async def build(cls, *args: Any, **kwargs: Any) -> "HAMT": 387 """ 388 Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__. 389 390 This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided. 391 """ 392 hamt = cls(*args, **kwargs) 393 if hamt.root_node_id is None: 394 hamt.root_node_id = await hamt.node_store.save(None, Node()) 395 return hamt
Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as __init__
. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
398 async def make_read_only(self) -> None: 399 """ 400 Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID. 401 402 In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between. 403 """ 404 async with self.lock: 405 inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store) 406 await inmemory_tree.vacate() 407 408 self.read_only = True 409 self.node_store = ReadCacheStore(self)
Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
411 async def enable_write(self) -> None: 412 """ 413 Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`. 414 """ 415 async with self.lock: 416 # The read cache has no writes that need to be sent upstream so we can remove it without vacating 417 self.read_only = False 418 self.node_store = InMemoryTreeStore(self)
Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use make_read_only
.
420 async def cache_size(self) -> int: 421 """ 422 Returns the memory used by some internal performance optimization tools in bytes. 423 424 This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however. 425 426 Be warned that this may take a while to run for large HAMTs. 427 428 For more on memory management, see the `HAMT` class documentation. 429 """ 430 if self.read_only: 431 return self.node_store.size() 432 async with self.lock: 433 return self.node_store.size()
Returns the memory used by some internal performance optimization tools in bytes.
This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
Be warned that this may take a while to run for large HAMTs.
For more on memory management, see the HAMT
class documentation.
435 async def cache_vacate(self) -> None: 436 """ 437 Vacate and completely empty out the internal read/write cache. 438 439 Be warned that this may take a while if there have been a lot of write operations. 440 441 For more on memory management, see the `HAMT` class documentation. 442 """ 443 if self.read_only: 444 await self.node_store.vacate() 445 else: 446 async with self.lock: 447 await self.node_store.vacate()
Vacate and completely empty out the internal read/write cache.
Be warned that this may take a while if there have been a lot of write operations.
For more on memory management, see the HAMT
class documentation.
490 async def set(self, key: str, val: IPLDKind) -> None: 491 """Write a key-value mapping.""" 492 if self.read_only: 493 raise Exception("Cannot call set on a read only HAMT") 494 495 data: bytes 496 if self.values_are_bytes: 497 data = cast( 498 bytes, val 499 ) # let users get an exception if they pass in a non bytes when they want to skip encoding 500 else: 501 data = dag_cbor.encode(val) 502 503 pointer: IPLDKind = await self.cas.save(data, codec="raw") 504 await self._set_pointer(key, pointer)
Write a key-value mapping.
553 async def delete(self, key: str) -> None: 554 """Delete a key-value mapping.""" 555 556 # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo 557 if self.read_only: 558 raise Exception("Cannot call delete on a read only HAMT") 559 560 async with self.lock: 561 raw_hash: bytes = self.hash_fn(key.encode()) 562 563 node_stack: list[tuple[IPLDKind, Node]] = [] 564 root_node: Node = await self.node_store.load(self.root_node_id) 565 node_stack.append((self.root_node_id, root_node)) 566 567 created_change: bool = False 568 while True: 569 _, top_node = node_stack[-1] 570 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 571 572 item = top_node.data[map_key] 573 if isinstance(item, dict): 574 bucket = item 575 if key in bucket: 576 del bucket[key] 577 created_change = True 578 # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError 579 break 580 elif isinstance(item, list): 581 link: IPLDKind = item[0] 582 next_node: Node = await self.node_store.load(link) 583 node_stack.append((link, next_node)) 584 585 # Finally, reserialize and fix all links, deleting empty nodes as needed 586 if created_change: 587 await self._reserialize_and_link(node_stack) 588 self.root_node_id = node_stack[0][0] 589 else: 590 # If we didn't make a change, then this key must not exist within the HAMT 591 raise KeyError
Delete a key-value mapping.
593 async def get(self, key: str) -> IPLDKind: 594 """Get a value.""" 595 pointer: IPLDKind = await self.get_pointer(key) 596 data: bytes = await self.cas.load(pointer) 597 if self.values_are_bytes: 598 return data 599 else: 600 return dag_cbor.decode(data)
Get a value.
602 async def get_pointer(self, key: str) -> IPLDKind: 603 """ 604 Get a store ID that points to the value for this key. 605 606 This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example. 607 """ 608 # If read only, no need to acquire a lock 609 pointer: IPLDKind 610 if self.read_only: 611 pointer = await self._get_pointer(key) 612 else: 613 async with self.lock: 614 pointer = await self._get_pointer(key) 615 616 return pointer
Get a store ID that points to the value for this key.
This is useful for some applications that want to implement a read cache. Due to the restrictions of ContentAddressedStore
on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in ZarrHAMTStore
for example.
665 async def keys(self) -> AsyncIterator[str]: 666 """ 667 AsyncIterator returning all keys in the HAMT. 668 669 If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed. 670 671 When the HAMT is in read only mode however, this can be run concurrently with get operations. 672 """ 673 if self.read_only: 674 async for k in self._keys_no_locking(): 675 yield k 676 else: 677 async with self.lock: 678 async for k in self._keys_no_locking(): 679 yield k
AsyncIterator returning all keys in the HAMT.
If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
When the HAMT is in read only mode however, this can be run concurrently with get operations.
687 async def len(self) -> int: 688 """ 689 Return the number of key value mappings in this HAMT. 690 691 When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations. 692 """ 693 count: int = 0 694 async for _ in self.keys(): 695 count += 1 696 697 return count
Return the number of key value mappings in this HAMT.
When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
14class ContentAddressedStore(ABC): 15 """ 16 Abstract class that represents a content addressed storage that the `HAMT` can use for keeping data. 17 18 Note that the return type of save and input to load is really type `IPLDKind`, but the documentation generator pdoc mangles it unfortunately. 19 20 #### A note on the IPLDKind return types 21 Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions: 22 1. No lists or dicts, since python does not classify these as immutable. 23 2. No `None` values since this is used in HAMT's `__init__` to indicate that an empty HAMT needs to be initialized. 24 """ 25 26 CodecInput = Literal["raw", "dag-cbor"] 27 28 @abstractmethod 29 async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: 30 """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type. 31 32 `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model. 33 """ 34 35 @abstractmethod 36 async def load(self, id: IPLDKind) -> bytes: 37 """Retrieve data."""
Abstract class that represents a content addressed storage that the HAMT
can use for keeping data.
Note that the return type of save and input to load is really type IPLDKind
, but the documentation generator pdoc mangles it unfortunately.
A note on the IPLDKind return types
Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:
- No lists or dicts, since python does not classify these as immutable.
- No
None
values since this is used in HAMT's__init__
to indicate that an empty HAMT needs to be initialized.
28 @abstractmethod 29 async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: 30 """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type. 31 32 `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model. 33 """
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec
will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
Retrieve data.
40class InMemoryCAS(ContentAddressedStore): 41 """Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that `save` returns and `load` takes in.""" 42 43 store: dict[bytes, bytes] 44 hash_alg: Multihash 45 46 def __init__(self): 47 self.store = dict() 48 self.hash_alg = multihash.get("blake3") 49 50 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes: 51 hash: bytes = self.hash_alg.digest(data, size=32) 52 self.store[hash] = data 53 return hash 54 55 async def load(self, id: IPLDKind) -> bytes: 56 """ 57 `ContentAddressedStore` allows any IPLD scalar key. For the in-memory 58 backend we *require* a `bytes` hash; anything else is rejected at run 59 time. In OO type-checking, a subclass may widen (make more general) argument types, 60 but it must never narrow them; otherwise callers that expect the base-class contract can break. 61 Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. 62 This is why we use `cast` here, to tell mypy that we know what we are doing. 63 h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch 64 """ 65 key = cast(bytes, id) 66 if not isinstance(key, (bytes, bytearray)): # defensive guard 67 raise TypeError( 68 f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" 69 ) 70 71 try: 72 return self.store[key] 73 except KeyError as exc: 74 raise KeyError("Object not found in in-memory store") from exc
Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that save
returns and load
takes in.
50 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes: 51 hash: bytes = self.hash_alg.digest(data, size=32) 52 self.store[hash] = data 53 return hash
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec
will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
55 async def load(self, id: IPLDKind) -> bytes: 56 """ 57 `ContentAddressedStore` allows any IPLD scalar key. For the in-memory 58 backend we *require* a `bytes` hash; anything else is rejected at run 59 time. In OO type-checking, a subclass may widen (make more general) argument types, 60 but it must never narrow them; otherwise callers that expect the base-class contract can break. 61 Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. 62 This is why we use `cast` here, to tell mypy that we know what we are doing. 63 h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch 64 """ 65 key = cast(bytes, id) 66 if not isinstance(key, (bytes, bytearray)): # defensive guard 67 raise TypeError( 68 f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" 69 ) 70 71 try: 72 return self.store[key] 73 except KeyError as exc: 74 raise KeyError("Object not found in in-memory store") from exc
ContentAddressedStore
allows any IPLD scalar key. For the in-memory
backend we require a bytes
hash; anything else is rejected at run
time. In OO type-checking, a subclass may widen (make more general) argument types,
but it must never narrow them; otherwise callers that expect the base-class contract can break.
Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
This is why we use cast
here, to tell mypy that we know what we are doing.
h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
77class KuboCAS(ContentAddressedStore): 78 """ 79 Connects to an **IPFS Kubo** daemon. 80 81 The IDs in save and load are IPLD CIDs. 82 83 * **save()** → RPC (`/api/v0/add`) 84 * **load()** → HTTP gateway (`/ipfs/{cid}`) 85 86 `save` uses the RPC API and `load` uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use. 87 88 ### Authentication / custom headers 89 You have two options: 90 91 1. **Bring your own `httpx.AsyncClient`** 92 Pass it via `client=...` — any default headers or auth 93 configured on that client are reused for **every** request. 94 2. **Let `KuboCAS` build the client** but pass 95 `headers=` *and*/or `auth=` kwargs; they are forwarded to the 96 internally–created `AsyncClient`. 97 98 ```python 99 import httpx 100 from py_hamt import KuboCAS 101 102 # Option 1: user-supplied client 103 client = httpx.AsyncClient( 104 headers={"Authorization": "Bearer <token>"}, 105 auth=("user", "pass"), 106 ) 107 cas = KuboCAS(client=client) 108 109 # Option 2: let KuboCAS create the client 110 cas = KuboCAS( 111 headers={"X-My-Header": "yes"}, 112 auth=("user", "pass"), 113 ) 114 ``` 115 116 ### Parameters 117 - **hasher** (str): multihash name (defaults to *blake3*). 118 - **client** (`httpx.AsyncClient | None`): reuse an existing 119 client; if *None* KuboCAS will create one lazily. 120 - **headers** (dict[str, str] | None): default headers for the 121 internally-created client. 122 - **auth** (`tuple[str, str] | None`): authentication tuple (username, password) 123 for the internally-created client. 124 - **rpc_base_url / gateway_base_url** (str | None): override daemon 125 endpoints (defaults match the local daemon ports). 126 - **chunker** (str): chunking algorithm specification for Kubo's `add` 127 RPC. Accepted formats are `"size-<positive int>"`, `"rabin"`, or 128 `"rabin-<min>-<avg>-<max>"`. 129 130 ... 131 """ 132 133 KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = "http://127.0.0.1:8080" 134 KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = "http://127.0.0.1:5001" 135 136 DAG_PB_MARKER: int = 0x70 137 """@private""" 138 139 # Take in a httpx client that can be reused across POSTs and GETs to a specific IPFS daemon 140 def __init__( 141 self, 142 hasher: str = "blake3", 143 client: httpx.AsyncClient | None = None, 144 rpc_base_url: str | None = None, 145 gateway_base_url: str | None = None, 146 concurrency: int = 32, 147 *, 148 headers: dict[str, str] | None = None, 149 auth: Tuple[str, str] | None = None, 150 chunker: str = "size-1048576", 151 max_retries: int = 3, 152 initial_delay: float = 1.0, 153 backoff_factor: float = 2.0, 154 ): 155 """ 156 If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all. 157 158 ### `httpx.AsyncClient` Management 159 If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()` 160 as a class instance cannot know when it will no longer be in use, unless explicitly told to do so. 161 162 If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below: 163 ```python 164 async with httpx.AsyncClient() as client, KuboCAS( 165 rpc_base_url=rpc_base_url, 166 gateway_base_url=gateway_base_url, 167 client=client, 168 ) as kubo_cas: 169 hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) 170 zhs = ZarrHAMTStore(hamt) 171 # Use the KuboCAS instance as needed 172 # ... 173 ``` 174 As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up. 175 ``` python 176 cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url) 177 # Use the KuboCAS instance as needed 178 # ... 179 await cas.aclose() # Ensure resources are cleaned up 180 ``` 181 182 ### Authenticated RPC/Gateway Access 183 Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in. 184 Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided. 185 If you do not need authentication, you can leave these parameters as `None`. 186 187 ### RPC and HTTP Gateway Base URLs 188 These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided. 189 """ 190 191 chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)" 192 if re.fullmatch(chunker_pattern, chunker) is None: 193 raise ValueError("Invalid chunker specification") 194 self.chunker: str = chunker 195 196 self.hasher: str = hasher 197 """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.""" 198 199 if rpc_base_url is None: 200 rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma 201 if gateway_base_url is None: 202 gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL 203 204 if "/ipfs/" in gateway_base_url: 205 gateway_base_url = gateway_base_url.split("/ipfs/")[0] 206 207 # Standard gateway URL construction with proper path handling 208 if gateway_base_url.endswith("/"): 209 gateway_base_url = f"{gateway_base_url}ipfs/" 210 else: 211 gateway_base_url = f"{gateway_base_url}/ipfs/" 212 213 self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin=false" 214 """@private""" 215 self.gateway_base_url: str = gateway_base_url 216 """@private""" 217 218 if client is not None: 219 # A client was supplied by the user. We don't own it. 220 self._owns_client = False 221 self._client_per_loop = {asyncio.get_running_loop(): client} 222 else: 223 # No client supplied. We will own any clients we create. 224 self._owns_client = True 225 self._client_per_loop = {} 226 227 # store for later use by _loop_client() 228 self._default_headers = headers 229 self._default_auth = auth 230 231 self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency) 232 self._closed: bool = False 233 234 # Validate retry parameters 235 if max_retries < 0: 236 raise ValueError("max_retries must be non-negative") 237 if initial_delay <= 0: 238 raise ValueError("initial_delay must be positive") 239 if backoff_factor < 1.0: 240 raise ValueError("backoff_factor must be >= 1.0 for exponential backoff") 241 242 self.max_retries = max_retries 243 self.initial_delay = initial_delay 244 self.backoff_factor = backoff_factor 245 246 # --------------------------------------------------------------------- # 247 # helper: get or create the client bound to the current running loop # 248 # --------------------------------------------------------------------- # 249 def _loop_client(self) -> httpx.AsyncClient: 250 """Get or create a client for the current event loop. 251 252 If the instance was previously closed but owns its clients, a fresh 253 client mapping is lazily created on demand. Users that supplied their 254 own ``httpx.AsyncClient`` still receive an error when the instance has 255 been closed, as we cannot safely recreate their client. 256 """ 257 if self._closed: 258 if not self._owns_client: 259 raise RuntimeError("KuboCAS is closed; create a new instance") 260 # We previously closed all internally-owned clients. Reset the 261 # state so that new clients can be created lazily. 262 self._closed = False 263 self._client_per_loop = {} 264 265 loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() 266 try: 267 return self._client_per_loop[loop] 268 except KeyError: 269 # Create a new client 270 client = httpx.AsyncClient( 271 timeout=60.0, 272 headers=self._default_headers, 273 auth=self._default_auth, 274 limits=httpx.Limits(max_connections=64, max_keepalive_connections=32), 275 # Uncomment when they finally support Robust HTTP/2 GOAWAY responses 276 # http2=True, 277 ) 278 self._client_per_loop[loop] = client 279 return client 280 281 # --------------------------------------------------------------------- # 282 # graceful shutdown: close **all** clients we own # 283 # --------------------------------------------------------------------- # 284 async def aclose(self) -> None: 285 """ 286 Closes all internally-created clients. Must be called from an async context. 287 """ 288 if self._owns_client is False: # external client → caller closes 289 return 290 291 # This method is async, so we can reliably await the async close method. 292 # The complex sync/async logic is handled by __del__. 293 for client in list(self._client_per_loop.values()): 294 if not client.is_closed: 295 try: 296 await client.aclose() 297 except Exception: 298 pass # best-effort cleanup 299 300 self._client_per_loop.clear() 301 self._closed = True 302 303 # At this point, _client_per_loop should be empty or only contain 304 # clients from loops we haven't seen (which shouldn't happen in practice) 305 async def __aenter__(self) -> "KuboCAS": 306 return self 307 308 async def __aexit__(self, *exc: Any) -> None: 309 await self.aclose() 310 311 def __del__(self) -> None: 312 """Best-effort close for internally-created clients.""" 313 if not hasattr(self, "_owns_client") or not hasattr(self, "_closed"): 314 return 315 316 if not self._owns_client or self._closed: 317 return 318 319 # Attempt proper cleanup if possible 320 try: 321 loop = asyncio.get_running_loop() 322 except RuntimeError: 323 # No running loop - can't do async cleanup 324 # Just clear the client references synchronously 325 if hasattr(self, "_client_per_loop"): 326 # We can't await client.aclose() without a loop, 327 # so just clear the references 328 self._client_per_loop.clear() 329 self._closed = True 330 return 331 332 # If we get here, we have a running loop 333 try: 334 if loop.is_running(): 335 # Schedule cleanup in the existing loop 336 loop.create_task(self.aclose()) 337 else: 338 # Loop exists but not running - try asyncio.run 339 coro = self.aclose() # Create the coroutine 340 try: 341 asyncio.run(coro) 342 except Exception: 343 # If asyncio.run fails, we need to close the coroutine properly 344 coro.close() # This prevents the RuntimeWarning 345 raise # Re-raise to hit the outer except block 346 except Exception: 347 # If all else fails, just clear references 348 if hasattr(self, "_client_per_loop"): 349 self._client_per_loop.clear() 350 self._closed = True 351 352 # --------------------------------------------------------------------- # 353 # save() – now uses the per-loop client # 354 # --------------------------------------------------------------------- # 355 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID: 356 async with self._sem: 357 files = {"file": data} 358 client = self._loop_client() 359 retry_count = 0 360 361 while retry_count <= self.max_retries: 362 try: 363 response = await client.post( 364 self.rpc_url, files=files, timeout=60.0 365 ) 366 response.raise_for_status() 367 cid_str: str = response.json()["Hash"] 368 cid: CID = CID.decode(cid_str) 369 if cid.codec.code != self.DAG_PB_MARKER: 370 cid = cid.set(codec=codec) 371 return cid 372 373 except (httpx.TimeoutException, httpx.RequestError) as e: 374 retry_count += 1 375 if retry_count > self.max_retries: 376 raise httpx.TimeoutException( 377 f"Failed to save data after {self.max_retries} retries: {str(e)}", 378 request=e.request 379 if isinstance(e, httpx.RequestError) 380 else None, 381 ) 382 383 # Calculate backoff delay 384 delay = self.initial_delay * ( 385 self.backoff_factor ** (retry_count - 1) 386 ) 387 # Add some jitter to prevent thundering herd 388 jitter = delay * 0.1 * (random.random() - 0.5) 389 await asyncio.sleep(delay + jitter) 390 391 except httpx.HTTPStatusError: 392 # Re-raise non-timeout HTTP errors immediately 393 raise 394 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover 395 396 async def load(self, id: IPLDKind) -> bytes: 397 cid = cast(CID, id) 398 url: str = f"{self.gateway_base_url + str(cid)}" 399 async with self._sem: 400 client = self._loop_client() 401 retry_count = 0 402 403 while retry_count <= self.max_retries: 404 try: 405 response = await client.get(url, timeout=60.0) 406 response.raise_for_status() 407 return response.content 408 409 except (httpx.TimeoutException, httpx.RequestError) as e: 410 retry_count += 1 411 if retry_count > self.max_retries: 412 raise httpx.TimeoutException( 413 f"Failed to load data after {self.max_retries} retries: {str(e)}", 414 request=e.request 415 if isinstance(e, httpx.RequestError) 416 else None, 417 ) 418 419 # Calculate backoff delay 420 delay = self.initial_delay * ( 421 self.backoff_factor ** (retry_count - 1) 422 ) 423 # Add some jitter to prevent thundering herd 424 jitter = delay * 0.1 * (random.random() - 0.5) 425 await asyncio.sleep(delay + jitter) 426 427 except httpx.HTTPStatusError: 428 # Re-raise non-timeout HTTP errors immediately 429 raise 430 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover
Connects to an IPFS Kubo daemon.
The IDs in save and load are IPLD CIDs.
- save() → RPC (
/api/v0/add
) - load() → HTTP gateway (
/ipfs/{cid}
)
save
uses the RPC API and load
uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.
Authentication / custom headers
You have two options:
- Bring your own
httpx.AsyncClient
Pass it viaclient=...
— any default headers or auth configured on that client are reused for every request. - Let
KuboCAS
build the client but passheaders=
and/orauth=
kwargs; they are forwarded to the internally–createdAsyncClient
.
import httpx
from py_hamt import KuboCAS
# Option 1: user-supplied client
client = httpx.AsyncClient(
headers={"Authorization": "Bearer <token>"},
auth=("user", "pass"),
)
cas = KuboCAS(client=client)
# Option 2: let KuboCAS create the client
cas = KuboCAS(
headers={"X-My-Header": "yes"},
auth=("user", "pass"),
)
Parameters
- hasher (str): multihash name (defaults to blake3).
- client (
httpx.AsyncClient | None
): reuse an existing client; if None KuboCAS will create one lazily. - headers (dict[str, str] | None): default headers for the internally-created client.
- auth (
tuple[str, str] | None
): authentication tuple (username, password) for the internally-created client. - rpc_base_url / gateway_base_url (str | None): override daemon endpoints (defaults match the local daemon ports).
- chunker (str): chunking algorithm specification for Kubo's
add
RPC. Accepted formats are"size-<positive int>"
,"rabin"
, or"rabin-<min>-<avg>-<max>"
.
...
140 def __init__( 141 self, 142 hasher: str = "blake3", 143 client: httpx.AsyncClient | None = None, 144 rpc_base_url: str | None = None, 145 gateway_base_url: str | None = None, 146 concurrency: int = 32, 147 *, 148 headers: dict[str, str] | None = None, 149 auth: Tuple[str, str] | None = None, 150 chunker: str = "size-1048576", 151 max_retries: int = 3, 152 initial_delay: float = 1.0, 153 backoff_factor: float = 2.0, 154 ): 155 """ 156 If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all. 157 158 ### `httpx.AsyncClient` Management 159 If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()` 160 as a class instance cannot know when it will no longer be in use, unless explicitly told to do so. 161 162 If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below: 163 ```python 164 async with httpx.AsyncClient() as client, KuboCAS( 165 rpc_base_url=rpc_base_url, 166 gateway_base_url=gateway_base_url, 167 client=client, 168 ) as kubo_cas: 169 hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) 170 zhs = ZarrHAMTStore(hamt) 171 # Use the KuboCAS instance as needed 172 # ... 173 ``` 174 As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up. 175 ``` python 176 cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url) 177 # Use the KuboCAS instance as needed 178 # ... 179 await cas.aclose() # Ensure resources are cleaned up 180 ``` 181 182 ### Authenticated RPC/Gateway Access 183 Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in. 184 Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided. 185 If you do not need authentication, you can leave these parameters as `None`. 186 187 ### RPC and HTTP Gateway Base URLs 188 These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided. 189 """ 190 191 chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)" 192 if re.fullmatch(chunker_pattern, chunker) is None: 193 raise ValueError("Invalid chunker specification") 194 self.chunker: str = chunker 195 196 self.hasher: str = hasher 197 """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.""" 198 199 if rpc_base_url is None: 200 rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma 201 if gateway_base_url is None: 202 gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL 203 204 if "/ipfs/" in gateway_base_url: 205 gateway_base_url = gateway_base_url.split("/ipfs/")[0] 206 207 # Standard gateway URL construction with proper path handling 208 if gateway_base_url.endswith("/"): 209 gateway_base_url = f"{gateway_base_url}ipfs/" 210 else: 211 gateway_base_url = f"{gateway_base_url}/ipfs/" 212 213 self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin=false" 214 """@private""" 215 self.gateway_base_url: str = gateway_base_url 216 """@private""" 217 218 if client is not None: 219 # A client was supplied by the user. We don't own it. 220 self._owns_client = False 221 self._client_per_loop = {asyncio.get_running_loop(): client} 222 else: 223 # No client supplied. We will own any clients we create. 224 self._owns_client = True 225 self._client_per_loop = {} 226 227 # store for later use by _loop_client() 228 self._default_headers = headers 229 self._default_auth = auth 230 231 self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency) 232 self._closed: bool = False 233 234 # Validate retry parameters 235 if max_retries < 0: 236 raise ValueError("max_retries must be non-negative") 237 if initial_delay <= 0: 238 raise ValueError("initial_delay must be positive") 239 if backoff_factor < 1.0: 240 raise ValueError("backoff_factor must be >= 1.0 for exponential backoff") 241 242 self.max_retries = max_retries 243 self.initial_delay = initial_delay 244 self.backoff_factor = backoff_factor
If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
httpx.AsyncClient
Management
If client
is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using await cas.aclose()
as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
If you are using the KuboCAS
instance in an async with
block, it will automatically close the client when the block is exited which is what we suggest below:
async with httpx.AsyncClient() as client, KuboCAS(
rpc_base_url=rpc_base_url,
gateway_base_url=gateway_base_url,
client=client,
) as kubo_cas:
hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
zhs = ZarrHAMTStore(hamt)
# Use the KuboCAS instance as needed
# ...
As mentioned, if you do not use the async with
syntax, you should call await cas.aclose()
when you are done using the instance to ensure that all resources are cleaned up.
cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
# Use the KuboCAS instance as needed
# ...
await cas.aclose() # Ensure resources are cleaned up
Authenticated RPC/Gateway Access
Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own httpx.AsyncClient
and then passing that in.
Alternatively, they can pass in headers
and auth
parameters to the constructor, which will be used to create a new httpx.AsyncClient
if one is not provided.
If you do not need authentication, you can leave these parameters as None
.
RPC and HTTP Gateway Base URLs
These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.
284 async def aclose(self) -> None: 285 """ 286 Closes all internally-created clients. Must be called from an async context. 287 """ 288 if self._owns_client is False: # external client → caller closes 289 return 290 291 # This method is async, so we can reliably await the async close method. 292 # The complex sync/async logic is handled by __del__. 293 for client in list(self._client_per_loop.values()): 294 if not client.is_closed: 295 try: 296 await client.aclose() 297 except Exception: 298 pass # best-effort cleanup 299 300 self._client_per_loop.clear() 301 self._closed = True
Closes all internally-created clients. Must be called from an async context.
355 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID: 356 async with self._sem: 357 files = {"file": data} 358 client = self._loop_client() 359 retry_count = 0 360 361 while retry_count <= self.max_retries: 362 try: 363 response = await client.post( 364 self.rpc_url, files=files, timeout=60.0 365 ) 366 response.raise_for_status() 367 cid_str: str = response.json()["Hash"] 368 cid: CID = CID.decode(cid_str) 369 if cid.codec.code != self.DAG_PB_MARKER: 370 cid = cid.set(codec=codec) 371 return cid 372 373 except (httpx.TimeoutException, httpx.RequestError) as e: 374 retry_count += 1 375 if retry_count > self.max_retries: 376 raise httpx.TimeoutException( 377 f"Failed to save data after {self.max_retries} retries: {str(e)}", 378 request=e.request 379 if isinstance(e, httpx.RequestError) 380 else None, 381 ) 382 383 # Calculate backoff delay 384 delay = self.initial_delay * ( 385 self.backoff_factor ** (retry_count - 1) 386 ) 387 # Add some jitter to prevent thundering herd 388 jitter = delay * 0.1 * (random.random() - 0.5) 389 await asyncio.sleep(delay + jitter) 390 391 except httpx.HTTPStatusError: 392 # Re-raise non-timeout HTTP errors immediately 393 raise 394 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec
will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
396 async def load(self, id: IPLDKind) -> bytes: 397 cid = cast(CID, id) 398 url: str = f"{self.gateway_base_url + str(cid)}" 399 async with self._sem: 400 client = self._loop_client() 401 retry_count = 0 402 403 while retry_count <= self.max_retries: 404 try: 405 response = await client.get(url, timeout=60.0) 406 response.raise_for_status() 407 return response.content 408 409 except (httpx.TimeoutException, httpx.RequestError) as e: 410 retry_count += 1 411 if retry_count > self.max_retries: 412 raise httpx.TimeoutException( 413 f"Failed to load data after {self.max_retries} retries: {str(e)}", 414 request=e.request 415 if isinstance(e, httpx.RequestError) 416 else None, 417 ) 418 419 # Calculate backoff delay 420 delay = self.initial_delay * ( 421 self.backoff_factor ** (retry_count - 1) 422 ) 423 # Add some jitter to prevent thundering herd 424 jitter = delay * 0.1 * (random.random() - 0.5) 425 await asyncio.sleep(delay + jitter) 426 427 except httpx.HTTPStatusError: 428 # Re-raise non-timeout HTTP errors immediately 429 raise 430 raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover
Retrieve data.
12class ZarrHAMTStore(zarr.abc.store.Store): 13 """ 14 Write and read Zarr v3s with a HAMT. 15 16 Read **or** write a Zarr-v3 store whose key/value pairs live inside a 17 py-hamt mapping. 18 19 Keys are stored verbatim (``"temp/c/0/0/0"`` → same string in HAMT) and 20 the value is the raw byte payload produced by Zarr. No additional 21 framing, compression, or encryption is applied by this class. For a zarr encryption example 22 see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb 23 For a fully encrypted zarr store, where metadata is not available, please see 24 :class:`SimpleEncryptedZarrHAMTStore` but we do not recommend using it. 25 26 #### A note about using the same `ZarrHAMTStore` for writing and then reading again 27 If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings. 28 29 #### Sample Code 30 ```python 31 # --- Write --- 32 ds: xarray.Dataset = # ... 33 cas: ContentAddressedStore = # ... 34 hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes 35 hamt = await HAMT.build(cas, values_are_bytes=True) # write-enabled 36 zhs = ZarrHAMTStore(hamt, read_only=False) 37 ds.to_zarr(store=zhs, mode="w", zarr_format=3) 38 await hamt.make_read_only() # flush + freeze 39 root_node_id = hamt.root_node_id 40 print(root_node_id) 41 42 # --- read --- 43 hamt_ro = await HAMT.build( 44 cas, root_node_id=root_cid, read_only=True, values_are_bytes=True 45 ) 46 zhs_ro = ZarrHAMTStore(hamt_ro, read_only=True) 47 ds_ro = xarray.open_zarr(store=zhs_ro) 48 49 50 print(ds_ro) 51 xarray.testing.assert_identical(ds, ds_ro) 52 ``` 53 """ 54 55 _forced_read_only: bool | None = None # sentinel for wrapper clones 56 57 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 58 """ 59 ### `hamt` and `read_only` 60 You need to make sure the following two things are true: 61 62 1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async. 63 2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations. 64 65 ##### A note about the zarr chunk separator, "/" vs "." 66 While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well. 67 68 #### Metadata Read Cache 69 `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data. 70 """ 71 super().__init__(read_only=read_only) 72 73 assert hamt.read_only == read_only 74 assert hamt.values_are_bytes 75 self.hamt: HAMT = hamt 76 """ 77 The internal HAMT. 78 Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID. 79 """ 80 81 self.metadata_read_cache: dict[str, bytes] = {} 82 """@private""" 83 84 @property 85 def read_only(self) -> bool: # type: ignore[override] 86 if self._forced_read_only is not None: # instance attr overrides 87 return self._forced_read_only 88 return self.hamt.read_only 89 90 def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore": 91 """ 92 Return this store (if the flag already matches) or a *shallow* 93 clone that presents the requested read‑only status. 94 95 The clone **shares** the same :class:`~py_hamt.hamt.HAMT` 96 instance; no flushing, network traffic or async work is done. 97 """ 98 # Fast path 99 if read_only == self.read_only: 100 return self # Same mode, return same instance 101 102 # Create new instance with different read_only flag 103 # Creates a *bare* instance without running its __init__ 104 clone = type(self).__new__(type(self)) 105 106 # Copy attributes that matter 107 clone.hamt = self.hamt # Share the HAMT 108 clone._forced_read_only = read_only 109 clone.metadata_read_cache = self.metadata_read_cache.copy() 110 111 # Re‑initialise the zarr base class so that Zarr sees the flag 112 zarr.abc.store.Store.__init__(clone, read_only=read_only) 113 return clone 114 115 def __eq__(self, other: object) -> bool: 116 """@private""" 117 if not isinstance(other, ZarrHAMTStore): 118 return False 119 return self.hamt.root_node_id == other.hamt.root_node_id 120 121 async def get( 122 self, 123 key: str, 124 prototype: zarr.core.buffer.BufferPrototype, 125 byte_range: zarr.abc.store.ByteRequest | None = None, 126 ) -> zarr.core.buffer.Buffer | None: 127 """@private""" 128 try: 129 val: bytes 130 # do len check to avoid indexing into overly short strings, 3.12 does not throw errors but we dont know if other versions will 131 is_metadata: bool = ( 132 len(key) >= 9 and key[-9:] == "zarr.json" 133 ) # if path ends with zarr.json 134 135 if is_metadata and key in self.metadata_read_cache: 136 val = self.metadata_read_cache[key] 137 else: 138 val = cast( 139 bytes, await self.hamt.get(key) 140 ) # We know values received will always be bytes since we only store bytes in the HAMT 141 if is_metadata: 142 self.metadata_read_cache[key] = val 143 144 return prototype.buffer.from_bytes(val) 145 except KeyError: 146 # Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases 147 return None 148 149 async def get_partial_values( 150 self, 151 prototype: zarr.core.buffer.BufferPrototype, 152 key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], 153 ) -> list[zarr.core.buffer.Buffer | None]: 154 """@private""" 155 raise NotImplementedError 156 157 async def exists(self, key: str) -> bool: 158 """@private""" 159 try: 160 await self.hamt.get(key) 161 return True 162 except KeyError: 163 return False 164 165 @property 166 def supports_writes(self) -> bool: 167 """@private""" 168 return not self.hamt.read_only 169 170 @property 171 def supports_partial_writes(self) -> bool: 172 """@private""" 173 return False 174 175 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 176 """@private""" 177 if self.read_only: 178 raise Exception("Cannot write to a read-only store.") 179 180 if key in self.metadata_read_cache: 181 self.metadata_read_cache[key] = value.to_bytes() 182 await self.hamt.set(key, value.to_bytes()) 183 184 async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None: 185 """@private""" 186 if not (await self.exists(key)): 187 await self.set(key, value) 188 189 async def set_partial_values( 190 self, key_start_values: Iterable[tuple[str, int, BytesLike]] 191 ) -> None: 192 """@private""" 193 raise NotImplementedError 194 195 @property 196 def supports_deletes(self) -> bool: 197 """@private""" 198 return not self.hamt.read_only 199 200 async def delete(self, key: str) -> None: 201 """@private""" 202 if self.read_only: 203 raise Exception("Cannot write to a read-only store.") 204 try: 205 await self.hamt.delete(key) 206 # In practice these lines never seem to be needed, creating and appending data are the only operations most zarrs actually undergo 207 # if key in self.metadata_read_cache: 208 # del self.metadata_read_cache[key] 209 # It's fine if the key was not in the HAMT 210 # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason, probably concurrency issues 211 except KeyError: 212 return 213 214 @property 215 def supports_listing(self) -> bool: 216 """@private""" 217 return True 218 219 async def list(self) -> AsyncIterator[str]: 220 """@private""" 221 async for key in self.hamt.keys(): 222 yield key 223 224 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 225 """@private""" 226 async for key in self.hamt.keys(): 227 if key.startswith(prefix): 228 yield key 229 230 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 231 """ 232 @private 233 List *immediate* children that live directly under **prefix**. 234 235 This is similar to :py:meth:`list_prefix` but collapses everything 236 below the first ``"/"`` after *prefix*. Each child name is yielded 237 **exactly once** in the order of first appearance while scanning the 238 HAMT keys. 239 240 Parameters 241 ---------- 242 prefix : str 243 Logical directory path. *Must* end with ``"/"`` for the result to 244 make sense (e.g. ``"a/b/"``). 245 246 Yields 247 ------ 248 str 249 The name of each direct child (file or sub-directory) of *prefix*. 250 251 Examples 252 -------- 253 With keys :: 254 255 a/b/c/d 256 a/b/c/e 257 a/b/f 258 a/b/g/h/i 259 260 ``await list_dir("a/b/")`` produces :: 261 262 c 263 f 264 g 265 266 Notes 267 ----- 268 • Internally uses a :class:`set` to deduplicate names; memory grows 269 with the number of *unique* children, not the total number of keys. 270 • Order is **not** sorted; it reflects the first encounter while 271 iterating over :py:meth:`HAMT.keys`. 272 """ 273 seen_names: set[str] = set() 274 async for key in self.hamt.keys(): 275 if key.startswith(prefix): 276 suffix: str = key[len(prefix) :] 277 first_slash: int = suffix.find("/") 278 if first_slash == -1: 279 if suffix not in seen_names: 280 seen_names.add(suffix) 281 yield suffix 282 else: 283 name: str = suffix[0:first_slash] 284 if name not in seen_names: 285 seen_names.add(name) 286 yield name
Write and read Zarr v3s with a HAMT.
Read or write a Zarr-v3 store whose key/value pairs live inside a py-hamt mapping.
Keys are stored verbatim ("temp/c/0/0/0"
→ same string in HAMT) and
the value is the raw byte payload produced by Zarr. No additional
framing, compression, or encryption is applied by this class. For a zarr encryption example
see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
For a fully encrypted zarr store, where metadata is not available, please see
SimpleEncryptedZarrHAMTStore
but we do not recommend using it.
A note about using the same ZarrHAMTStore
for writing and then reading again
If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.
Sample Code
# --- Write ---
ds: xarray.Dataset = # ...
cas: ContentAddressedStore = # ...
hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
hamt = await HAMT.build(cas, values_are_bytes=True) # write-enabled
zhs = ZarrHAMTStore(hamt, read_only=False)
ds.to_zarr(store=zhs, mode="w", zarr_format=3)
await hamt.make_read_only() # flush + freeze
root_node_id = hamt.root_node_id
print(root_node_id)
# --- read ---
hamt_ro = await HAMT.build(
cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
)
zhs_ro = ZarrHAMTStore(hamt_ro, read_only=True)
ds_ro = xarray.open_zarr(store=zhs_ro)
print(ds_ro)
xarray.testing.assert_identical(ds, ds_ro)
57 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 58 """ 59 ### `hamt` and `read_only` 60 You need to make sure the following two things are true: 61 62 1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async. 63 2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations. 64 65 ##### A note about the zarr chunk separator, "/" vs "." 66 While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well. 67 68 #### Metadata Read Cache 69 `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data. 70 """ 71 super().__init__(read_only=read_only) 72 73 assert hamt.read_only == read_only 74 assert hamt.values_are_bytes 75 self.hamt: HAMT = hamt 76 """ 77 The internal HAMT. 78 Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID. 79 """ 80 81 self.metadata_read_cache: dict[str, bytes] = {} 82 """@private"""
hamt
and read_only
You need to make sure the following two things are true:
- The HAMT is in the same read only mode that you are passing into the Zarr store. This means that
hamt.read_only == read_only
. This is because making a HAMT read only automatically requires async operations, but__init__
cannot be async. - The HAMT has
hamt.values_are_bytes == True
. This improves efficiency with Zarr v3 operations.
A note about the zarr chunk separator, "/" vs "."
While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
Metadata Read Cache
ZarrHAMTStore
has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
The internal HAMT. Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
84 @property 85 def read_only(self) -> bool: # type: ignore[override] 86 if self._forced_read_only is not None: # instance attr overrides 87 return self._forced_read_only 88 return self.hamt.read_only
Is the store read-only?
90 def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore": 91 """ 92 Return this store (if the flag already matches) or a *shallow* 93 clone that presents the requested read‑only status. 94 95 The clone **shares** the same :class:`~py_hamt.hamt.HAMT` 96 instance; no flushing, network traffic or async work is done. 97 """ 98 # Fast path 99 if read_only == self.read_only: 100 return self # Same mode, return same instance 101 102 # Create new instance with different read_only flag 103 # Creates a *bare* instance without running its __init__ 104 clone = type(self).__new__(type(self)) 105 106 # Copy attributes that matter 107 clone.hamt = self.hamt # Share the HAMT 108 clone._forced_read_only = read_only 109 clone.metadata_read_cache = self.metadata_read_cache.copy() 110 111 # Re‑initialise the zarr base class so that Zarr sees the flag 112 zarr.abc.store.Store.__init__(clone, read_only=read_only) 113 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same ~py_hamt.hamt.HAMT
instance; no flushing, network traffic or async work is done.
13class SimpleEncryptedZarrHAMTStore(ZarrHAMTStore): 14 """ 15 Write and read Zarr v3s with a HAMT, encrypting *everything* for maximum privacy. 16 17 This store uses ChaCha20-Poly1305 to encrypt every single key-value pair 18 stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.) 19 and data chunks. This provides strong privacy but means the Zarr store is 20 completely opaque and unusable without the correct encryption key and header. 21 22 Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb 23 24 #### Encryption Details 25 - Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce). 26 - Requires a 32-byte encryption key and a header. 27 - Each encrypted value includes a 24-byte nonce and a 16-byte tag. 28 29 #### Important Considerations 30 - Since metadata is encrypted, standard Zarr tools cannot inspect the 31 dataset without prior decryption using this store class. 32 - There is no support for partial encryption or excluding variables. 33 - There is no metadata caching. 34 35 #### Sample Code 36 ```python 37 import xarray 38 from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar 39 from Crypto.Random import get_random_bytes 40 import numpy as np 41 42 # Setup 43 ds = xarray.Dataset( 44 {"data": (("y", "x"), np.arange(12).reshape(3, 4))}, 45 coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]} 46 ) 47 cas = KuboCAS() # Example ContentAddressedStore 48 encryption_key = get_random_bytes(32) 49 header = b"fully-encrypted-zarr" 50 51 # --- Write --- 52 hamt_write = await HAMT.build(cas=cas, values_are_bytes=True) 53 ezhs_write = SimpleEncryptedZarrHAMTStore( 54 hamt_write, False, encryption_key, header 55 ) 56 print("Writing fully encrypted Zarr...") 57 ds.to_zarr(store=ezhs_write, mode="w") 58 await hamt_write.make_read_only() 59 root_node_id = hamt_write.root_node_id 60 print(f"Wrote Zarr with root: {root_node_id}") 61 62 # --- Read --- 63 hamt_read = await HAMT.build( 64 cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True 65 ) 66 ezhs_read = SimpleEncryptedZarrHAMTStore( 67 hamt_read, True, encryption_key, header 68 ) 69 print("\nReading fully encrypted Zarr...") 70 ds_read = xarray.open_zarr(store=ezhs_read) 71 print("Read back dataset:") 72 print(ds_read) 73 xarray.testing.assert_identical(ds, ds_read) 74 print("Read successful and data verified.") 75 76 # --- Read with wrong key (demonstrates failure) --- 77 wrong_key = get_random_bytes(32) 78 hamt_bad = await HAMT.build( 79 cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True 80 ) 81 ezhs_bad = SimpleEncryptedZarrHAMTStore( 82 hamt_bad, True, wrong_key, header 83 ) 84 print("\nAttempting to read with wrong key...") 85 try: 86 ds_bad = xarray.open_zarr(store=ezhs_bad) 87 print(ds_bad) 88 except Exception as e: 89 print(f"Failed to read as expected: {type(e).__name__} - {e}") 90 ``` 91 """ 92 93 def __init__( 94 self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes 95 ) -> None: 96 """ 97 Initializes the SimpleEncryptedZarrHAMTStore. 98 99 Args: 100 hamt: The HAMT instance for storage. Must have `values_are_bytes=True`. 101 Its `read_only` status must match the `read_only` argument. 102 read_only: If True, the store is in read-only mode. 103 encryption_key: A 32-byte key for ChaCha20-Poly1305. 104 header: A header (bytes) used as associated data in encryption. 105 """ 106 super().__init__(hamt, read_only=read_only) 107 108 if len(encryption_key) != 32: 109 raise ValueError("Encryption key must be exactly 32 bytes long.") 110 self.encryption_key = encryption_key 111 self.header = header 112 self.metadata_read_cache: dict[str, bytes] = {} 113 114 def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore": 115 if read_only == self.read_only: 116 return self 117 118 clone = type(self).__new__(type(self)) 119 clone.hamt = self.hamt 120 clone.encryption_key = self.encryption_key 121 clone.header = self.header 122 clone.metadata_read_cache = self.metadata_read_cache 123 clone._forced_read_only = read_only # safe; attribute is declared 124 zarr.abc.store.Store.__init__(clone, read_only=read_only) 125 return clone 126 127 def _encrypt(self, val: bytes) -> bytes: 128 """Encrypts data using ChaCha20-Poly1305.""" 129 nonce = get_random_bytes(24) # XChaCha20 uses a 24-byte nonce 130 cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce) 131 cipher.update(self.header) 132 ciphertext, tag = cipher.encrypt_and_digest(val) 133 return nonce + tag + ciphertext 134 135 def _decrypt(self, val: bytes) -> bytes: 136 """Decrypts data using ChaCha20-Poly1305.""" 137 try: 138 # Extract nonce (24), tag (16), and ciphertext 139 nonce, tag, ciphertext = val[:24], val[24:40], val[40:] 140 cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce) 141 cipher.update(self.header) 142 plaintext = cipher.decrypt_and_verify(ciphertext, tag) 143 return plaintext 144 except Exception as e: 145 # Catching a broad exception as various issues (key, tag, length) can occur. 146 raise ValueError( 147 "Decryption failed. Check key, header, or data integrity." 148 ) from e 149 150 def __eq__(self, other: object) -> bool: 151 """@private""" 152 if not isinstance(other, SimpleEncryptedZarrHAMTStore): 153 return False 154 return ( 155 self.hamt.root_node_id == other.hamt.root_node_id 156 and self.encryption_key == other.encryption_key 157 and self.header == other.header 158 ) 159 160 async def get( 161 self, 162 key: str, 163 prototype: zarr.core.buffer.BufferPrototype, 164 byte_range: zarr.abc.store.ByteRequest | None = None, 165 ) -> zarr.core.buffer.Buffer | None: 166 """@private""" 167 try: 168 decrypted_val: bytes 169 is_metadata: bool = ( 170 len(key) >= 9 and key[-9:] == "zarr.json" 171 ) # if path ends with zarr.json 172 173 if is_metadata and key in self.metadata_read_cache: 174 decrypted_val = self.metadata_read_cache[key] 175 else: 176 raw_val = cast( 177 bytes, await self.hamt.get(key) 178 ) # We know values received will always be bytes since we only store bytes in the HAMT 179 decrypted_val = self._decrypt(raw_val) 180 if is_metadata: 181 self.metadata_read_cache[key] = decrypted_val 182 return prototype.buffer.from_bytes(decrypted_val) 183 except KeyError: 184 return None 185 186 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 187 """@private""" 188 if self.read_only: 189 raise Exception("Cannot write to a read-only store.") 190 191 raw_bytes = value.to_bytes() 192 if key in self.metadata_read_cache: 193 self.metadata_read_cache[key] = raw_bytes 194 # Encrypt it 195 encrypted_bytes = self._encrypt(raw_bytes) 196 await self.hamt.set(key, encrypted_bytes)
Write and read Zarr v3s with a HAMT, encrypting everything for maximum privacy.
This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
and data chunks. This provides strong privacy but means the Zarr store is
completely opaque and unusable without the correct encryption key and header.
Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
#### Encryption Details
- Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
- Requires a 32-byte encryption key and a header.
- Each encrypted value includes a 24-byte nonce and a 16-byte tag.
#### Important Considerations
- Since metadata is encrypted, standard Zarr tools cannot inspect the
dataset without prior decryption using this store class.
- There is no support for partial encryption or excluding variables.
- There is no metadata caching.
#### Sample Code
import xarray
from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
from Crypto.Random import get_random_bytes
import numpy as np
# Setup
ds = xarray.Dataset(
{"data": (("y", "x"), np.arange(12).reshape(3, 4))},
coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
)
cas = KuboCAS() # Example ContentAddressedStore
encryption_key = get_random_bytes(32)
header = b"fully-encrypted-zarr"
# --- Write ---
hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
ezhs_write = SimpleEncryptedZarrHAMTStore(
hamt_write, False, encryption_key, header
)
print("Writing fully encrypted Zarr...")
ds.to_zarr(store=ezhs_write, mode="w")
await hamt_write.make_read_only()
root_node_id = hamt_write.root_node_id
print(f"Wrote Zarr with root: {root_node_id}")
# --- Read ---
hamt_read = await HAMT.build(
cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
)
ezhs_read = SimpleEncryptedZarrHAMTStore(
hamt_read, True, encryption_key, header
)
print("
Reading fully encrypted Zarr...")
ds_read = xarray.open_zarr(store=ezhs_read)
print("Read back dataset:")
print(ds_read)
xarray.testing.assert_identical(ds, ds_read)
print("Read successful and data verified.")
# --- Read with wrong key (demonstrates failure) ---
wrong_key = get_random_bytes(32)
hamt_bad = await HAMT.build(
cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
)
ezhs_bad = SimpleEncryptedZarrHAMTStore(
hamt_bad, True, wrong_key, header
)
print("
Attempting to read with wrong key...")
try:
ds_bad = xarray.open_zarr(store=ezhs_bad)
print(ds_bad)
except Exception as e:
print(f"Failed to read as expected: {type(e).__name__} - {e}")
93 def __init__( 94 self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes 95 ) -> None: 96 """ 97 Initializes the SimpleEncryptedZarrHAMTStore. 98 99 Args: 100 hamt: The HAMT instance for storage. Must have `values_are_bytes=True`. 101 Its `read_only` status must match the `read_only` argument. 102 read_only: If True, the store is in read-only mode. 103 encryption_key: A 32-byte key for ChaCha20-Poly1305. 104 header: A header (bytes) used as associated data in encryption. 105 """ 106 super().__init__(hamt, read_only=read_only) 107 108 if len(encryption_key) != 32: 109 raise ValueError("Encryption key must be exactly 32 bytes long.") 110 self.encryption_key = encryption_key 111 self.header = header 112 self.metadata_read_cache: dict[str, bytes] = {}
Initializes the SimpleEncryptedZarrHAMTStore.
Args:
hamt: The HAMT instance for storage. Must have values_are_bytes=True
.
Its read_only
status must match the read_only
argument.
read_only: If True, the store is in read-only mode.
encryption_key: A 32-byte key for ChaCha20-Poly1305.
header: A header (bytes) used as associated data in encryption.
114 def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore": 115 if read_only == self.read_only: 116 return self 117 118 clone = type(self).__new__(type(self)) 119 clone.hamt = self.hamt 120 clone.encryption_key = self.encryption_key 121 clone.header = self.header 122 clone.metadata_read_cache = self.metadata_read_cache 123 clone._forced_read_only = read_only # safe; attribute is declared 124 zarr.abc.store.Store.__init__(clone, read_only=read_only) 125 return clone
Return this store (if the flag already matches) or a shallow clone that presents the requested read‑only status.
The clone shares the same ~py_hamt.hamt.HAMT
instance; no flushing, network traffic or async work is done.