py_hamt
1from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore 2from .hamt import HAMT, blake3_hashfn 3from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS 4from .zarr_hamt_store import ZarrHAMTStore 5 6__all__ = [ 7 "blake3_hashfn", 8 "HAMT", 9 "ContentAddressedStore", 10 "InMemoryCAS", 11 "KuboCAS", 12 "ZarrHAMTStore", 13 "SimpleEncryptedZarrHAMTStore", 14] 15 16print("Running py-hamt from source!")
52def blake3_hashfn(input_bytes: bytes) -> bytes: 53 """ 54 This is the default blake3 hash function used for the `HAMT`, with a 32 byte hash size. 55 56 """ 57 # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify 58 digest: bytes = b3.digest(input_bytes, size=32) 59 raw_bytes: bytes = b3.unwrap(digest) 60 return raw_bytes
This is the default blake3 hash function used for the HAMT
, with a 32 byte hash size.
288class HAMT: 289 """ 290 An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model. 291 292 Use this to store arbitrarily large key-value mappings in your CAS of choice. 293 294 For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread. 295 296 When in read-only mode, the HAMT is both async and thread safe. 297 298 #### A note about memory management, read+write and read-only modes 299 The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations. 300 301 Note that in read+write, the real root node id IS NOT VALID. You should call `make_read_only()` to convert to read only mode and then read `root_node_id`. 302 303 These optimizations also trade off performance for memory use. Use `cache_size` to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use `cache_vacate` if you are over your memory limits. 304 305 #### IPFS HAMT Sample Code 306 ```python 307 kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints 308 hamt = await HAMT.build(cas=kubo_cas) 309 await hamt.set("foo", "bar") 310 assert (await hamt.get("foo")) == "bar" 311 await hamt.make_read_only() 312 cid = hamt.root_node_id # our root node CID 313 print(cid) 314 ``` 315 """ 316 317 def __init__( 318 self, 319 cas: ContentAddressedStore, 320 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 321 root_node_id: IPLDKind | None = None, 322 read_only: bool = False, 323 max_bucket_size: int = 4, 324 values_are_bytes: bool = False, 325 ): 326 """ 327 Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below. 328 """ 329 330 self.cas: ContentAddressedStore = cas 331 """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS.""" 332 333 self.hash_fn: Callable[[bytes], bytes] = hash_fn 334 """ 335 This is the hash function used to place a key-value within the HAMT. 336 337 To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes. 338 339 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 340 341 Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors. 342 """ 343 344 self.lock: asyncio.Lock = asyncio.Lock() 345 """@private""" 346 347 self.values_are_bytes: bool = values_are_bytes 348 """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind. 349 350 This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk. 351 """ 352 353 if max_bucket_size < 1: 354 raise ValueError("Bucket size maximum must be a positive integer") 355 self.max_bucket_size: int = max_bucket_size 356 """ 357 This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided. 358 359 This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS. 360 361 This must be a positive integer with a minimum of 1. 362 """ 363 364 self.root_node_id: IPLDKind = root_node_id 365 """ 366 This is type IPLDKind but the documentation generator pdoc mangles it a bit. 367 368 Read from this only when in read mode to get something valid! 369 """ 370 371 self.read_only: bool = read_only 372 """Clients should NOT modify this. 373 374 This is here for checking whether the HAMT is in read only or read/write mode. 375 376 The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS. 377 """ 378 self.node_store: NodeStore 379 """@private""" 380 if read_only: 381 self.node_store = ReadCacheStore(self) 382 else: 383 self.node_store = InMemoryTreeStore(self) 384 385 @classmethod 386 async def build(cls, *args: Any, **kwargs: Any) -> "HAMT": 387 """ 388 Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__. 389 390 This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided. 391 """ 392 hamt = cls(*args, **kwargs) 393 if hamt.root_node_id is None: 394 hamt.root_node_id = await hamt.node_store.save(None, Node()) 395 return hamt 396 397 # This is typically a massive blocking operation, you dont want to be running this concurrently with a bunch of other operations, so it's ok to have it not be async 398 async def make_read_only(self) -> None: 399 """ 400 Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID. 401 402 In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between. 403 """ 404 async with self.lock: 405 inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store) 406 await inmemory_tree.vacate() 407 408 self.read_only = True 409 self.node_store = ReadCacheStore(self) 410 411 async def enable_write(self) -> None: 412 """ 413 Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`. 414 """ 415 async with self.lock: 416 # The read cache has no writes that need to be sent upstream so we can remove it without vacating 417 self.read_only = False 418 self.node_store = InMemoryTreeStore(self) 419 420 async def cache_size(self) -> int: 421 """ 422 Returns the memory used by some internal performance optimization tools in bytes. 423 424 This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however. 425 426 Be warned that this may take a while to run for large HAMTs. 427 428 For more on memory management, see the `HAMT` class documentation. 429 """ 430 if self.read_only: 431 return self.node_store.size() 432 async with self.lock: 433 return self.node_store.size() 434 435 async def cache_vacate(self) -> None: 436 """ 437 Vacate and completely empty out the internal read/write cache. 438 439 Be warned that this may take a while if there have been a lot of write operations. 440 441 For more on memory management, see the `HAMT` class documentation. 442 """ 443 if self.read_only: 444 await self.node_store.vacate() 445 else: 446 async with self.lock: 447 await self.node_store.vacate() 448 449 async def _reserialize_and_link( 450 self, node_stack: list[tuple[IPLDKind, Node]] 451 ) -> None: 452 """ 453 This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store 454 Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack 455 Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python 456 If a node ends up being empty, then it is deleted entirely, unless it is the root node 457 Modifies in place 458 """ 459 # iterate in the reverse direction, this range goes from n-1 to 0, from the bottommost tree node to the root 460 for stack_index in range(len(node_stack) - 1, -1, -1): 461 old_id, node = node_stack[stack_index] 462 463 # If this node is empty, and it's not the root node, then we can delete it entirely from the list 464 is_root: bool = stack_index == 0 465 if node.is_empty() and not is_root: 466 # Unlink from the rest of the tree 467 _, prev_node = node_stack[stack_index - 1] 468 # When removing links, don't worry about two nodes having the same link since all nodes are guaranteed to be different by the removal of empty nodes after every single operation 469 for link_index in prev_node.iter_link_indices(): 470 link = prev_node.get_link(link_index) 471 if link == old_id: 472 # Delete the link by making it an empty bucket 473 prev_node.data[link_index] = {} 474 break 475 476 # Remove from our stack, continue reserializing up the tree 477 node_stack.pop(stack_index) 478 continue 479 480 # If not an empty node, just reserialize like normal and replace this one 481 new_store_id: IPLDKind = await self.node_store.save(old_id, node) 482 node_stack[stack_index] = (new_store_id, node) 483 484 # If this is not the last i.e. root node, we need to change the linking of the node prior in the list since we just reserialized 485 if not is_root: 486 _, prev_node = node_stack[stack_index - 1] 487 prev_node.replace_link(old_id, new_store_id) 488 489 # automatically skip encoding if the value provided is of the bytes variety 490 async def set(self, key: str, val: IPLDKind) -> None: 491 """Write a key-value mapping.""" 492 if self.read_only: 493 raise Exception("Cannot call set on a read only HAMT") 494 495 data: bytes 496 if self.values_are_bytes: 497 data = cast( 498 bytes, val 499 ) # let users get an exception if they pass in a non bytes when they want to skip encoding 500 else: 501 data = dag_cbor.encode(val) 502 503 pointer: IPLDKind = await self.cas.save(data, codec="raw") 504 await self._set_pointer(key, pointer) 505 506 async def _set_pointer(self, key: str, val_ptr: IPLDKind) -> None: 507 async with self.lock: 508 node_stack: list[tuple[IPLDKind, Node]] = [] 509 root_node: Node = await self.node_store.load(self.root_node_id) 510 node_stack.append((self.root_node_id, root_node)) 511 512 # FIFO queue to keep track of all the KVs we need to insert 513 # This is needed if any buckets overflow and so we need to reinsert all those KVs 514 kvs_queue: list[tuple[str, IPLDKind]] = [] 515 kvs_queue.append((key, val_ptr)) 516 517 while len(kvs_queue) > 0: 518 _, top_node = node_stack[-1] 519 curr_key, curr_val_ptr = kvs_queue[0] 520 521 raw_hash: bytes = self.hash_fn(curr_key.encode()) 522 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 523 524 item = top_node.data[map_key] 525 if isinstance(item, list): 526 next_node_id: IPLDKind = item[0] 527 next_node: Node = await self.node_store.load(next_node_id) 528 node_stack.append((next_node_id, next_node)) 529 elif isinstance(item, dict): 530 bucket: dict[str, IPLDKind] = item 531 532 # If this bucket already has this same key, or has space, just rewrite the value and then go work on the others in the queue 533 if curr_key in bucket or len(bucket) < self.max_bucket_size: 534 bucket[curr_key] = curr_val_ptr 535 kvs_queue.pop(0) 536 continue 537 538 # The current key is not in the bucket and the bucket is too full, so empty KVs from the bucket and restart insertion 539 for k in bucket: 540 v_ptr = bucket[k] 541 kvs_queue.append((k, v_ptr)) 542 543 # Create a new link to a new node so that we can reflow these KVs into a new subtree 544 new_node = Node() 545 new_node_id: IPLDKind = await self.node_store.save(None, new_node) 546 link: list[IPLDKind] = [new_node_id] 547 top_node.data[map_key] = link 548 549 # Finally, reserialize and fix all links, deleting empty nodes as needed 550 await self._reserialize_and_link(node_stack) 551 self.root_node_id = node_stack[0][0] 552 553 async def delete(self, key: str) -> None: 554 """Delete a key-value mapping.""" 555 556 # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo 557 if self.read_only: 558 raise Exception("Cannot call delete on a read only HAMT") 559 560 async with self.lock: 561 raw_hash: bytes = self.hash_fn(key.encode()) 562 563 node_stack: list[tuple[IPLDKind, Node]] = [] 564 root_node: Node = await self.node_store.load(self.root_node_id) 565 node_stack.append((self.root_node_id, root_node)) 566 567 created_change: bool = False 568 while True: 569 _, top_node = node_stack[-1] 570 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 571 572 item = top_node.data[map_key] 573 if isinstance(item, dict): 574 bucket = item 575 if key in bucket: 576 del bucket[key] 577 created_change = True 578 # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError 579 break 580 elif isinstance(item, list): 581 link: IPLDKind = item[0] 582 next_node: Node = await self.node_store.load(link) 583 node_stack.append((link, next_node)) 584 585 # Finally, reserialize and fix all links, deleting empty nodes as needed 586 if created_change: 587 await self._reserialize_and_link(node_stack) 588 self.root_node_id = node_stack[0][0] 589 else: 590 # If we didn't make a change, then this key must not exist within the HAMT 591 raise KeyError 592 593 async def get(self, key: str) -> IPLDKind: 594 """Get a value.""" 595 pointer: IPLDKind = await self.get_pointer(key) 596 data: bytes = await self.cas.load(pointer) 597 if self.values_are_bytes: 598 return data 599 else: 600 return dag_cbor.decode(data) 601 602 async def get_pointer(self, key: str) -> IPLDKind: 603 """ 604 Get a store ID that points to the value for this key. 605 606 This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example. 607 """ 608 # If read only, no need to acquire a lock 609 pointer: IPLDKind 610 if self.read_only: 611 pointer = await self._get_pointer(key) 612 else: 613 async with self.lock: 614 pointer = await self._get_pointer(key) 615 616 return pointer 617 618 # Callers MUST handle acquiring a lock 619 async def _get_pointer(self, key: str) -> IPLDKind: 620 raw_hash: bytes = self.hash_fn(key.encode()) 621 622 current_id: IPLDKind = self.root_node_id 623 current_depth: int = 0 624 625 # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value of IPLDKind 626 result_ptr: IPLDKind = None 627 found_a_result: bool = False 628 while True: 629 top_id: IPLDKind = current_id 630 top_node: Node = await self.node_store.load(top_id) 631 map_key: int = extract_bits(raw_hash, current_depth, 8) 632 633 # Check if this key is in one of the buckets 634 item = top_node.data[map_key] 635 if isinstance(item, dict): 636 bucket = item 637 if key in bucket: 638 result_ptr = bucket[key] 639 found_a_result = True 640 break 641 642 if isinstance(item, list): 643 link: IPLDKind = item[0] 644 current_id = link 645 current_depth += 1 646 continue 647 648 # Nowhere left to go, stop walking down the tree 649 break 650 651 if not found_a_result: 652 raise KeyError 653 654 return result_ptr 655 656 # Callers MUST handle locking or not on their own 657 async def _iter_nodes(self) -> AsyncIterator[tuple[IPLDKind, Node]]: 658 node_id_stack: list[IPLDKind] = [self.root_node_id] 659 while len(node_id_stack) > 0: 660 top_id: IPLDKind = node_id_stack.pop() 661 node: Node = await self.node_store.load(top_id) 662 yield (top_id, node) 663 node_id_stack.extend(list(node.iter_links())) 664 665 async def keys(self) -> AsyncIterator[str]: 666 """ 667 AsyncIterator returning all keys in the HAMT. 668 669 If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed. 670 671 When the HAMT is in read only mode however, this can be run concurrently with get operations. 672 """ 673 if self.read_only: 674 async for k in self._keys_no_locking(): 675 yield k 676 else: 677 async with self.lock: 678 async for k in self._keys_no_locking(): 679 yield k 680 681 async def _keys_no_locking(self) -> AsyncIterator[str]: 682 async for _, node in self._iter_nodes(): 683 for bucket in node.iter_buckets(): 684 for key in bucket: 685 yield key 686 687 async def len(self) -> int: 688 """ 689 Return the number of key value mappings in this HAMT. 690 691 When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations. 692 """ 693 count: int = 0 694 async for _ in self.keys(): 695 count += 1 696 697 return count
An implementation of a Hash Array Mapped Trie for an arbitrary Content Addressed Storage (CAS) system, e.g. IPFS. This uses the IPLD data model.
Use this to store arbitrarily large key-value mappings in your CAS of choice.
For writing, this HAMT is async safe but NOT thread safe. Only write in an async event loop within the same thread.
When in read-only mode, the HAMT is both async and thread safe.
A note about memory management, read+write and read-only modes
The HAMT can be in either read+write mode or read-only mode. For either of these modes, the HAMT has some internal performance optimizations.
Note that in read+write, the real root node id IS NOT VALID. You should call make_read_only()
to convert to read only mode and then read root_node_id
.
These optimizations also trade off performance for memory use. Use cache_size
to monitor the approximate memory usage. Be warned that for large key-value mapping sets this may take a bit to run. Use cache_vacate
if you are over your memory limits.
IPFS HAMT Sample Code
kubo_cas = KuboCAS() # connects to a local kubo node with the default endpoints
hamt = await HAMT.build(cas=kubo_cas)
await hamt.set("foo", "bar")
assert (await hamt.get("foo")) == "bar"
await hamt.make_read_only()
cid = hamt.root_node_id # our root node CID
print(cid)
317 def __init__( 318 self, 319 cas: ContentAddressedStore, 320 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 321 root_node_id: IPLDKind | None = None, 322 read_only: bool = False, 323 max_bucket_size: int = 4, 324 values_are_bytes: bool = False, 325 ): 326 """ 327 Use `build` if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below. 328 """ 329 330 self.cas: ContentAddressedStore = cas 331 """The backing storage system. py-hamt provides an implementation `KuboCAS` for IPFS.""" 332 333 self.hash_fn: Callable[[bytes], bytes] = hash_fn 334 """ 335 This is the hash function used to place a key-value within the HAMT. 336 337 To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes. 338 339 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 340 341 Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors. 342 """ 343 344 self.lock: asyncio.Lock = asyncio.Lock() 345 """@private""" 346 347 self.values_are_bytes: bool = values_are_bytes 348 """Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind. 349 350 This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk. 351 """ 352 353 if max_bucket_size < 1: 354 raise ValueError("Bucket size maximum must be a positive integer") 355 self.max_bucket_size: int = max_bucket_size 356 """ 357 This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided. 358 359 This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS. 360 361 This must be a positive integer with a minimum of 1. 362 """ 363 364 self.root_node_id: IPLDKind = root_node_id 365 """ 366 This is type IPLDKind but the documentation generator pdoc mangles it a bit. 367 368 Read from this only when in read mode to get something valid! 369 """ 370 371 self.read_only: bool = read_only 372 """Clients should NOT modify this. 373 374 This is here for checking whether the HAMT is in read only or read/write mode. 375 376 The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS. 377 """ 378 self.node_store: NodeStore 379 """@private""" 380 if read_only: 381 self.node_store = ReadCacheStore(self) 382 else: 383 self.node_store = InMemoryTreeStore(self)
Use build
if you need to create a completely empty HAMT, as this requires some async operations with the CAS. For what each of the constructor input variables refer to, check the documentation with the matching names below.
The backing storage system. py-hamt provides an implementation KuboCAS
for IPFS.
This is the hash function used to place a key-value within the HAMT.
To provide your own hash function, create a function that takes in arbitrarily long bytes and returns the hash bytes.
It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
Theoretically your hash size must only be a minimum of 1 byte, and there can be less than or the same number of hash collisions as the bucket size. Any more and the HAMT will most likely throw errors.
Set this to true if you are only going to be storing python bytes objects into the hamt. This will improve performance by skipping a serialization step from IPLDKind.
This is theoretically safe to change in between operations, but this has not been verified in testing, so only do this at your own risk.
This is only important for tuning performance when writing! For reading a HAMT that was written with a different max bucket size, this does not need to match and can be left unprovided.
This is an internal detail that has been exposed for performance tuning. The HAMT handles large key-value mapping sets even on a content addressed system by essentially sharding all the mappings across many smaller Nodes. The memory footprint of each of these Nodes footprint is a linear function of the maximum bucket size. Larger bucket sizes will result in larger Nodes, but more time taken to retrieve and decode these nodes from your backing CAS.
This must be a positive integer with a minimum of 1.
This is type IPLDKind but the documentation generator pdoc mangles it a bit.
Read from this only when in read mode to get something valid!
Clients should NOT modify this.
This is here for checking whether the HAMT is in read only or read/write mode.
The distinction is made for performance and correctness reasons. In read only mode, the HAMT has an internal read cache that can speed up operations. In read/write mode, for reads the HAMT maintains strong consistency for reads by using async locks, and for writes the HAMT writes to an in memory buffer rather than performing (possibly) network calls to the underlying CAS.
385 @classmethod 386 async def build(cls, *args: Any, **kwargs: Any) -> "HAMT": 387 """ 388 Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as `__init__`. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__. 389 390 This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided. 391 """ 392 hamt = cls(*args, **kwargs) 393 if hamt.root_node_id is None: 394 hamt.root_node_id = await hamt.node_store.save(None, Node()) 395 return hamt
Use this if you are initializing a completely empty HAMT! That means passing in None for the root_node_id. Method arguments are the exact same as __init__
. If the root_node_id is not None, this will have no difference than creating a HAMT instance with __init__.
This separate async method is required since initializing an empty HAMT means sending some internal objects to the underlying CAS, which requires async operations. python does not allow for an async __init__, so this method is separately provided.
398 async def make_read_only(self) -> None: 399 """ 400 Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID. 401 402 In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between. 403 """ 404 async with self.lock: 405 inmemory_tree: InMemoryTreeStore = cast(InMemoryTreeStore, self.node_store) 406 await inmemory_tree.vacate() 407 408 self.read_only = True 409 self.node_store = ReadCacheStore(self)
Makes the HAMT read only, which allows for more parallel read operations. The HAMT also needs to be in read only mode to get the real root node ID.
In read+write mode, the HAMT normally has to block separate get calls to enable strong consistency in case a set/delete operation falls in between.
411 async def enable_write(self) -> None: 412 """ 413 Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use `make_read_only`. 414 """ 415 async with self.lock: 416 # The read cache has no writes that need to be sent upstream so we can remove it without vacating 417 self.read_only = False 418 self.node_store = InMemoryTreeStore(self)
Enable both reads and writes. This creates an internal structure for performance optimizations which will result in the root node ID no longer being valid, in order to read that at the end of your operations you must first use make_read_only
.
420 async def cache_size(self) -> int: 421 """ 422 Returns the memory used by some internal performance optimization tools in bytes. 423 424 This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however. 425 426 Be warned that this may take a while to run for large HAMTs. 427 428 For more on memory management, see the `HAMT` class documentation. 429 """ 430 if self.read_only: 431 return self.node_store.size() 432 async with self.lock: 433 return self.node_store.size()
Returns the memory used by some internal performance optimization tools in bytes.
This is async concurrency safe, so call it whenever. This does mean it will block and wait for other writes to finish however.
Be warned that this may take a while to run for large HAMTs.
For more on memory management, see the HAMT
class documentation.
435 async def cache_vacate(self) -> None: 436 """ 437 Vacate and completely empty out the internal read/write cache. 438 439 Be warned that this may take a while if there have been a lot of write operations. 440 441 For more on memory management, see the `HAMT` class documentation. 442 """ 443 if self.read_only: 444 await self.node_store.vacate() 445 else: 446 async with self.lock: 447 await self.node_store.vacate()
Vacate and completely empty out the internal read/write cache.
Be warned that this may take a while if there have been a lot of write operations.
For more on memory management, see the HAMT
class documentation.
490 async def set(self, key: str, val: IPLDKind) -> None: 491 """Write a key-value mapping.""" 492 if self.read_only: 493 raise Exception("Cannot call set on a read only HAMT") 494 495 data: bytes 496 if self.values_are_bytes: 497 data = cast( 498 bytes, val 499 ) # let users get an exception if they pass in a non bytes when they want to skip encoding 500 else: 501 data = dag_cbor.encode(val) 502 503 pointer: IPLDKind = await self.cas.save(data, codec="raw") 504 await self._set_pointer(key, pointer)
Write a key-value mapping.
553 async def delete(self, key: str) -> None: 554 """Delete a key-value mapping.""" 555 556 # Also deletes the pointer at the same time so this doesn't have a _delete_pointer duo 557 if self.read_only: 558 raise Exception("Cannot call delete on a read only HAMT") 559 560 async with self.lock: 561 raw_hash: bytes = self.hash_fn(key.encode()) 562 563 node_stack: list[tuple[IPLDKind, Node]] = [] 564 root_node: Node = await self.node_store.load(self.root_node_id) 565 node_stack.append((self.root_node_id, root_node)) 566 567 created_change: bool = False 568 while True: 569 _, top_node = node_stack[-1] 570 map_key: int = extract_bits(raw_hash, len(node_stack) - 1, 8) 571 572 item = top_node.data[map_key] 573 if isinstance(item, dict): 574 bucket = item 575 if key in bucket: 576 del bucket[key] 577 created_change = True 578 # Break out since whether or not the key is in the bucket, it should have been here so either now reserialize or raise a KeyError 579 break 580 elif isinstance(item, list): 581 link: IPLDKind = item[0] 582 next_node: Node = await self.node_store.load(link) 583 node_stack.append((link, next_node)) 584 585 # Finally, reserialize and fix all links, deleting empty nodes as needed 586 if created_change: 587 await self._reserialize_and_link(node_stack) 588 self.root_node_id = node_stack[0][0] 589 else: 590 # If we didn't make a change, then this key must not exist within the HAMT 591 raise KeyError
Delete a key-value mapping.
593 async def get(self, key: str) -> IPLDKind: 594 """Get a value.""" 595 pointer: IPLDKind = await self.get_pointer(key) 596 data: bytes = await self.cas.load(pointer) 597 if self.values_are_bytes: 598 return data 599 else: 600 return dag_cbor.decode(data)
Get a value.
602 async def get_pointer(self, key: str) -> IPLDKind: 603 """ 604 Get a store ID that points to the value for this key. 605 606 This is useful for some applications that want to implement a read cache. Due to the restrictions of `ContentAddressedStore` on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in `ZarrHAMTStore` for example. 607 """ 608 # If read only, no need to acquire a lock 609 pointer: IPLDKind 610 if self.read_only: 611 pointer = await self._get_pointer(key) 612 else: 613 async with self.lock: 614 pointer = await self._get_pointer(key) 615 616 return pointer
Get a store ID that points to the value for this key.
This is useful for some applications that want to implement a read cache. Due to the restrictions of ContentAddressedStore
on IDs, pointers are regarded as immutable by python so they can be easily used as IDs for read caches. This is utilized in ZarrHAMTStore
for example.
665 async def keys(self) -> AsyncIterator[str]: 666 """ 667 AsyncIterator returning all keys in the HAMT. 668 669 If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed. 670 671 When the HAMT is in read only mode however, this can be run concurrently with get operations. 672 """ 673 if self.read_only: 674 async for k in self._keys_no_locking(): 675 yield k 676 else: 677 async with self.lock: 678 async for k in self._keys_no_locking(): 679 yield k
AsyncIterator returning all keys in the HAMT.
If the HAMT is write enabled, to maintain strong consistency this will obtain an async lock and not allow any other operations to proceed.
When the HAMT is in read only mode however, this can be run concurrently with get operations.
687 async def len(self) -> int: 688 """ 689 Return the number of key value mappings in this HAMT. 690 691 When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations. 692 """ 693 count: int = 0 694 async for _ in self.keys(): 695 count += 1 696 697 return count
Return the number of key value mappings in this HAMT.
When the HAMT is write enabled, to maintain strong consistency it will acquire a lock and thus not allow any other operations to proceed until the length is fully done being calculated. If read only, then this can be run concurrently with other operations.
13class ContentAddressedStore(ABC): 14 """ 15 Abstract class that represents a content addressed storage that the `HAMT` can use for keeping data. 16 17 Note that the return type of save and input to load is really type `IPLDKind`, but the documentation generator pdoc mangles it unfortunately. 18 19 #### A note on the IPLDKind return types 20 Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions: 21 1. No lists or dicts, since python does not classify these as immutable. 22 2. No `None` values since this is used in HAMT's `__init__` to indicate that an empty HAMT needs to be initialized. 23 """ 24 25 CodecInput = Literal["raw", "dag-cbor"] 26 27 @abstractmethod 28 async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: 29 """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type. 30 31 `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model. 32 """ 33 34 @abstractmethod 35 async def load(self, id: IPLDKind) -> bytes: 36 """Retrieve data."""
Abstract class that represents a content addressed storage that the HAMT
can use for keeping data.
Note that the return type of save and input to load is really type IPLDKind
, but the documentation generator pdoc mangles it unfortunately.
A note on the IPLDKind return types
Save and load return the type IPLDKind and not just a CID. As long as python regards the underlying type as immutable it can be used, allowing for more flexibility. There are two exceptions:
- No lists or dicts, since python does not classify these as immutable.
- No
None
values since this is used in HAMT's__init__
to indicate that an empty HAMT needs to be initialized.
27 @abstractmethod 28 async def save(self, data: bytes, codec: CodecInput) -> IPLDKind: 29 """Save data to a storage mechanism, and return an ID for the data in the IPLDKind type. 30 31 `codec` will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model. 32 """
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec
will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
Retrieve data.
39class InMemoryCAS(ContentAddressedStore): 40 """Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that `save` returns and `load` takes in.""" 41 42 store: dict[bytes, bytes] 43 hash_alg: Multihash 44 45 def __init__(self): 46 self.store = dict() 47 self.hash_alg = multihash.get("blake3") 48 49 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes: 50 hash: bytes = self.hash_alg.digest(data, size=32) 51 self.store[hash] = data 52 return hash 53 54 async def load(self, id: IPLDKind) -> bytes: 55 """ 56 `ContentAddressedStore` allows any IPLD scalar key. For the in-memory 57 backend we *require* a `bytes` hash; anything else is rejected at run 58 time. In OO type-checking, a subclass may widen (make more general) argument types, 59 but it must never narrow them; otherwise callers that expect the base-class contract can break. 60 Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. 61 This is why we use `cast` here, to tell mypy that we know what we are doing. 62 h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch 63 """ 64 key = cast(bytes, id) 65 if not isinstance(key, (bytes, bytearray)): # defensive guard 66 raise TypeError( 67 f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" 68 ) 69 70 try: 71 return self.store[key] 72 except KeyError as exc: 73 raise KeyError("Object not found in in-memory store") from exc
Used mostly for faster testing, this is why this is not exported. It hashes all inputs and uses that as a key to an in-memory python dict, mimicking a content addressed storage system. The hash bytes are the ID that save
returns and load
takes in.
49 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> bytes: 50 hash: bytes = self.hash_alg.digest(data, size=32) 51 self.store[hash] = data 52 return hash
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec
will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
54 async def load(self, id: IPLDKind) -> bytes: 55 """ 56 `ContentAddressedStore` allows any IPLD scalar key. For the in-memory 57 backend we *require* a `bytes` hash; anything else is rejected at run 58 time. In OO type-checking, a subclass may widen (make more general) argument types, 59 but it must never narrow them; otherwise callers that expect the base-class contract can break. 60 Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error. 61 This is why we use `cast` here, to tell mypy that we know what we are doing. 62 h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch 63 """ 64 key = cast(bytes, id) 65 if not isinstance(key, (bytes, bytearray)): # defensive guard 66 raise TypeError( 67 f"InMemoryCAS only supports byte‐hash keys; got {type(id).__name__}" 68 ) 69 70 try: 71 return self.store[key] 72 except KeyError as exc: 73 raise KeyError("Object not found in in-memory store") from exc
ContentAddressedStore
allows any IPLD scalar key. For the in-memory
backend we require a bytes
hash; anything else is rejected at run
time. In OO type-checking, a subclass may widen (make more general) argument types,
but it must never narrow them; otherwise callers that expect the base-class contract can break.
Mypy enforces this contra-variance rule and emits the "violates Liskov substitution principle" error.
This is why we use cast
here, to tell mypy that we know what we are doing.
h/t https://stackoverflow.com/questions/75209249/overriding-a-method-mypy-throws-an-incompatible-with-super-type-error-when-ch
76class KuboCAS(ContentAddressedStore): 77 """ 78 Connects to an **IPFS Kubo** daemon. 79 80 The IDs in save and load are IPLD CIDs. 81 82 * **save()** → RPC (`/api/v0/add`) 83 * **load()** → HTTP gateway (`/ipfs/{cid}`) 84 85 `save` uses the RPC API and `load` uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use. 86 87 ### Authentication / custom headers 88 You have two options: 89 90 1. **Bring your own `httpx.AsyncClient`** 91 Pass it via `client=...` — any default headers or auth 92 configured on that client are reused for **every** request. 93 2. **Let `KuboCAS` build the client** but pass 94 `headers=` *and*/or `auth=` kwargs; they are forwarded to the 95 internally–created `AsyncClient`. 96 97 ```python 98 import httpx 99 from py_hamt import KuboCAS 100 101 # Option 1: user-supplied client 102 client = httpx.AsyncClient( 103 headers={"Authorization": "Bearer <token>"}, 104 auth=("user", "pass"), 105 ) 106 cas = KuboCAS(client=client) 107 108 # Option 2: let KuboCAS create the client 109 cas = KuboCAS( 110 headers={"X-My-Header": "yes"}, 111 auth=("user", "pass"), 112 ) 113 ``` 114 115 ### Parameters 116 - **hasher** (str): multihash name (defaults to *blake3*). 117 - **client** (`httpx.AsyncClient | None`): reuse an existing 118 client; if *None* KuboCAS will create one lazily. 119 - **headers** (dict[str, str] | None): default headers for the 120 internally-created client. 121 - **auth** (`tuple[str, str] | None`): authentication tuple (username, password) 122 for the internally-created client. 123 - **rpc_base_url / gateway_base_url** (str | None): override daemon 124 endpoints (defaults match the local daemon ports). 125 - **chunker** (str): chunking algorithm specification for Kubo's `add` 126 RPC. Accepted formats are `"size-<positive int>"`, `"rabin"`, or 127 `"rabin-<min>-<avg>-<max>"`. 128 129 ... 130 """ 131 132 KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL: str = "http://127.0.0.1:8080" 133 KUBO_DEFAULT_LOCAL_RPC_BASE_URL: str = "http://127.0.0.1:5001" 134 135 DAG_PB_MARKER: int = 0x70 136 """@private""" 137 138 # Take in a httpx client that can be reused across POSTs and GETs to a specific IPFS daemon 139 def __init__( 140 self, 141 hasher: str = "blake3", 142 client: httpx.AsyncClient | None = None, 143 rpc_base_url: str | None = None, 144 gateway_base_url: str | None = None, 145 concurrency: int = 32, 146 *, 147 headers: dict[str, str] | None = None, 148 auth: Tuple[str, str] | None = None, 149 chunker: str = "size-1048576", 150 ): 151 """ 152 If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all. 153 154 ### `httpx.AsyncClient` Management 155 If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()` 156 as a class instance cannot know when it will no longer be in use, unless explicitly told to do so. 157 158 If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below: 159 ```python 160 async with httpx.AsyncClient() as client, KuboCAS( 161 rpc_base_url=rpc_base_url, 162 gateway_base_url=gateway_base_url, 163 client=client, 164 ) as kubo_cas: 165 hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) 166 zhs = ZarrHAMTStore(hamt) 167 # Use the KuboCAS instance as needed 168 # ... 169 ``` 170 As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up. 171 ``` python 172 cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url) 173 # Use the KuboCAS instance as needed 174 # ... 175 await cas.aclose() # Ensure resources are cleaned up 176 ``` 177 178 ### Authenticated RPC/Gateway Access 179 Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in. 180 Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided. 181 If you do not need authentication, you can leave these parameters as `None`. 182 183 ### RPC and HTTP Gateway Base URLs 184 These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided. 185 """ 186 187 chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)" 188 if re.fullmatch(chunker_pattern, chunker) is None: 189 raise ValueError("Invalid chunker specification") 190 self.chunker: str = chunker 191 192 self.hasher: str = hasher 193 """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.""" 194 195 if rpc_base_url is None: 196 rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma 197 if gateway_base_url is None: 198 gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL 199 200 if "/ipfs/" in gateway_base_url: 201 gateway_base_url = gateway_base_url.split("/ipfs/")[0] 202 203 # Standard gateway URL construction with proper path handling 204 if gateway_base_url.endswith("/"): 205 gateway_base_url = f"{gateway_base_url}ipfs/" 206 else: 207 gateway_base_url = f"{gateway_base_url}/ipfs/" 208 209 self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin=false" 210 """@private""" 211 self.gateway_base_url: str = gateway_base_url 212 """@private""" 213 214 self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {} 215 216 if client is not None: 217 # user supplied → bind it to *their* current loop 218 self._client_per_loop[asyncio.get_running_loop()] = client 219 self._owns_client: bool = False 220 else: 221 self._owns_client = True # we'll create clients lazily 222 223 # store for later use by _loop_client() 224 self._default_headers = headers 225 self._default_auth = auth 226 227 self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency) 228 self._closed: bool = False 229 230 # --------------------------------------------------------------------- # 231 # helper: get or create the client bound to the current running loop # 232 # --------------------------------------------------------------------- # 233 def _loop_client(self) -> httpx.AsyncClient: 234 """Get or create a client for the current event loop.""" 235 loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() 236 try: 237 return self._client_per_loop[loop] 238 except KeyError: 239 # Create a new client 240 client = httpx.AsyncClient( 241 timeout=60.0, 242 headers=self._default_headers, 243 auth=self._default_auth, 244 limits=httpx.Limits(max_connections=64, max_keepalive_connections=32), 245 # Uncomment when they finally support Robost HTTP/2 GOAWAY responses 246 # http2=True, 247 ) 248 self._client_per_loop[loop] = client 249 return client 250 251 # --------------------------------------------------------------------- # 252 # graceful shutdown: close **all** clients we own # 253 # --------------------------------------------------------------------- # 254 async def aclose(self) -> None: 255 """Close all internally-created clients.""" 256 if not self._owns_client: 257 # User supplied the client; they are responsible for closing it. 258 return 259 260 for client in list(self._client_per_loop.values()): 261 if not client.is_closed: 262 try: 263 await client.aclose() 264 except Exception: 265 # Best-effort cleanup; ignore errors during shutdown 266 pass 267 268 self._client_per_loop.clear() 269 self._closed = True 270 271 # At this point, _client_per_loop should be empty or only contain 272 # clients from loops we haven't seen (which shouldn't happen in practice) 273 async def __aenter__(self) -> "KuboCAS": 274 return self 275 276 async def __aexit__(self, *exc: Any) -> None: 277 await self.aclose() 278 279 def __del__(self) -> None: 280 """Best-effort close for internally-created clients.""" 281 if not self._owns_client or self._closed: 282 return 283 284 # Attempt proper cleanup if possible 285 try: 286 loop = asyncio.get_running_loop() 287 except RuntimeError: 288 loop = None 289 290 try: 291 if loop is None or not loop.is_running(): 292 asyncio.run(self.aclose()) 293 else: 294 loop.create_task(self.aclose()) 295 except Exception: 296 # Suppress all errors during interpreter shutdown or loop teardown 297 pass 298 299 # --------------------------------------------------------------------- # 300 # save() – now uses the per-loop client # 301 # --------------------------------------------------------------------- # 302 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID: 303 async with self._sem: # throttle RPC 304 # Create multipart form data 305 files = {"file": data} 306 307 # Send the POST request 308 client = self._loop_client() 309 response = await client.post(self.rpc_url, files=files) 310 response.raise_for_status() 311 cid_str: str = response.json()["Hash"] 312 313 cid: CID = CID.decode(cid_str) 314 if cid.codec.code != self.DAG_PB_MARKER: 315 cid = cid.set(codec=codec) 316 return cid 317 318 async def load(self, id: IPLDKind) -> bytes: 319 """@private""" 320 cid = cast(CID, id) # CID is definitely in the IPLDKind type 321 url: str = f"{self.gateway_base_url + str(cid)}" 322 323 async with self._sem: # throttle gateway 324 client = self._loop_client() 325 response = await client.get(url) 326 response.raise_for_status() 327 return response.content
Connects to an IPFS Kubo daemon.
The IDs in save and load are IPLD CIDs.
- save() → RPC (
/api/v0/add
) - load() → HTTP gateway (
/ipfs/{cid}
)
save
uses the RPC API and load
uses the HTTP Gateway. This means that read-only HAMTs will only access the HTTP Gateway, so no RPC endpoint is required for use.
Authentication / custom headers
You have two options:
- Bring your own
httpx.AsyncClient
Pass it viaclient=...
— any default headers or auth configured on that client are reused for every request. - Let
KuboCAS
build the client but passheaders=
and/orauth=
kwargs; they are forwarded to the internally–createdAsyncClient
.
import httpx
from py_hamt import KuboCAS
# Option 1: user-supplied client
client = httpx.AsyncClient(
headers={"Authorization": "Bearer <token>"},
auth=("user", "pass"),
)
cas = KuboCAS(client=client)
# Option 2: let KuboCAS create the client
cas = KuboCAS(
headers={"X-My-Header": "yes"},
auth=("user", "pass"),
)
Parameters
- hasher (str): multihash name (defaults to blake3).
- client (
httpx.AsyncClient | None
): reuse an existing client; if None KuboCAS will create one lazily. - headers (dict[str, str] | None): default headers for the internally-created client.
- auth (
tuple[str, str] | None
): authentication tuple (username, password) for the internally-created client. - rpc_base_url / gateway_base_url (str | None): override daemon endpoints (defaults match the local daemon ports).
- chunker (str): chunking algorithm specification for Kubo's
add
RPC. Accepted formats are"size-<positive int>"
,"rabin"
, or"rabin-<min>-<avg>-<max>"
.
...
139 def __init__( 140 self, 141 hasher: str = "blake3", 142 client: httpx.AsyncClient | None = None, 143 rpc_base_url: str | None = None, 144 gateway_base_url: str | None = None, 145 concurrency: int = 32, 146 *, 147 headers: dict[str, str] | None = None, 148 auth: Tuple[str, str] | None = None, 149 chunker: str = "size-1048576", 150 ): 151 """ 152 If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all. 153 154 ### `httpx.AsyncClient` Management 155 If `client` is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using `await cas.aclose()` 156 as a class instance cannot know when it will no longer be in use, unless explicitly told to do so. 157 158 If you are using the `KuboCAS` instance in an `async with` block, it will automatically close the client when the block is exited which is what we suggest below: 159 ```python 160 async with httpx.AsyncClient() as client, KuboCAS( 161 rpc_base_url=rpc_base_url, 162 gateway_base_url=gateway_base_url, 163 client=client, 164 ) as kubo_cas: 165 hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) 166 zhs = ZarrHAMTStore(hamt) 167 # Use the KuboCAS instance as needed 168 # ... 169 ``` 170 As mentioned, if you do not use the `async with` syntax, you should call `await cas.aclose()` when you are done using the instance to ensure that all resources are cleaned up. 171 ``` python 172 cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url) 173 # Use the KuboCAS instance as needed 174 # ... 175 await cas.aclose() # Ensure resources are cleaned up 176 ``` 177 178 ### Authenticated RPC/Gateway Access 179 Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own `httpx.AsyncClient` and then passing that in. 180 Alternatively, they can pass in `headers` and `auth` parameters to the constructor, which will be used to create a new `httpx.AsyncClient` if one is not provided. 181 If you do not need authentication, you can leave these parameters as `None`. 182 183 ### RPC and HTTP Gateway Base URLs 184 These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided. 185 """ 186 187 chunker_pattern = r"(?:size-[1-9]\d*|rabin(?:-[1-9]\d*-[1-9]\d*-[1-9]\d*)?)" 188 if re.fullmatch(chunker_pattern, chunker) is None: 189 raise ValueError("Invalid chunker specification") 190 self.chunker: str = chunker 191 192 self.hasher: str = hasher 193 """The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.""" 194 195 if rpc_base_url is None: 196 rpc_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_RPC_BASE_URL # pragma 197 if gateway_base_url is None: 198 gateway_base_url = KuboCAS.KUBO_DEFAULT_LOCAL_GATEWAY_BASE_URL 199 200 if "/ipfs/" in gateway_base_url: 201 gateway_base_url = gateway_base_url.split("/ipfs/")[0] 202 203 # Standard gateway URL construction with proper path handling 204 if gateway_base_url.endswith("/"): 205 gateway_base_url = f"{gateway_base_url}ipfs/" 206 else: 207 gateway_base_url = f"{gateway_base_url}/ipfs/" 208 209 self.rpc_url: str = f"{rpc_base_url}/api/v0/add?hash={self.hasher}&chunker={self.chunker}&pin=false" 210 """@private""" 211 self.gateway_base_url: str = gateway_base_url 212 """@private""" 213 214 self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {} 215 216 if client is not None: 217 # user supplied → bind it to *their* current loop 218 self._client_per_loop[asyncio.get_running_loop()] = client 219 self._owns_client: bool = False 220 else: 221 self._owns_client = True # we'll create clients lazily 222 223 # store for later use by _loop_client() 224 self._default_headers = headers 225 self._default_auth = auth 226 227 self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency) 228 self._closed: bool = False
If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
httpx.AsyncClient
Management
If client
is not provided, it will be automatically initialized. It is the responsibility of the user to close this at an appropriate time, using await cas.aclose()
as a class instance cannot know when it will no longer be in use, unless explicitly told to do so.
If you are using the KuboCAS
instance in an async with
block, it will automatically close the client when the block is exited which is what we suggest below:
async with httpx.AsyncClient() as client, KuboCAS(
rpc_base_url=rpc_base_url,
gateway_base_url=gateway_base_url,
client=client,
) as kubo_cas:
hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True)
zhs = ZarrHAMTStore(hamt)
# Use the KuboCAS instance as needed
# ...
As mentioned, if you do not use the async with
syntax, you should call await cas.aclose()
when you are done using the instance to ensure that all resources are cleaned up.
cas = KuboCAS(rpc_base_url=rpc_base_url, gateway_base_url=gateway_base_url)
# Use the KuboCAS instance as needed
# ...
await cas.aclose() # Ensure resources are cleaned up
Authenticated RPC/Gateway Access
Users can set whatever headers and auth credentials they need if they are connecting to an authenticated kubo instance by setting them in their own httpx.AsyncClient
and then passing that in.
Alternatively, they can pass in headers
and auth
parameters to the constructor, which will be used to create a new httpx.AsyncClient
if one is not provided.
If you do not need authentication, you can leave these parameters as None
.
RPC and HTTP Gateway Base URLs
These are the first part of the url, defaults that refer to the default that kubo launches with on a local machine are provided.
The hash function to send to IPFS when storing bytes. Cannot be changed after initialization. The default blake3 follows the default hashing algorithm used by HAMT.
254 async def aclose(self) -> None: 255 """Close all internally-created clients.""" 256 if not self._owns_client: 257 # User supplied the client; they are responsible for closing it. 258 return 259 260 for client in list(self._client_per_loop.values()): 261 if not client.is_closed: 262 try: 263 await client.aclose() 264 except Exception: 265 # Best-effort cleanup; ignore errors during shutdown 266 pass 267 268 self._client_per_loop.clear() 269 self._closed = True
Close all internally-created clients.
302 async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID: 303 async with self._sem: # throttle RPC 304 # Create multipart form data 305 files = {"file": data} 306 307 # Send the POST request 308 client = self._loop_client() 309 response = await client.post(self.rpc_url, files=files) 310 response.raise_for_status() 311 cid_str: str = response.json()["Hash"] 312 313 cid: CID = CID.decode(cid_str) 314 if cid.codec.code != self.DAG_PB_MARKER: 315 cid = cid.set(codec=codec) 316 return cid
Save data to a storage mechanism, and return an ID for the data in the IPLDKind type.
codec
will be set to "dag-cbor" if this data should be marked as special linked data a la IPLD data model.
12class ZarrHAMTStore(zarr.abc.store.Store): 13 """ 14 Write and read Zarr v3s with a HAMT. 15 16 Read **or** write a Zarr-v3 store whose key/value pairs live inside a 17 py-hamt mapping. 18 19 Keys are stored verbatim (``"temp/c/0/0/0"`` → same string in HAMT) and 20 the value is the raw byte payload produced by Zarr. No additional 21 framing, compression, or encryption is applied by this class. For a zarr encryption example 22 see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb 23 For a fully encrypted zarr store, where metadata is not available, please see 24 :class:`SimpleEncryptedZarrHAMTStore` but we do not recommend using it. 25 26 #### A note about using the same `ZarrHAMTStore` for writing and then reading again 27 If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings. 28 29 #### Sample Code 30 ```python 31 # --- Write --- 32 ds: xarray.Dataset = # ... 33 cas: ContentAddressedStore = # ... 34 hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes 35 hamt = await HAMT.build(cas, values_are_bytes=True) # write-enabled 36 zhs = ZarrHAMTStore(hamt, read_only=False) 37 ds.to_zarr(store=zhs, mode="w", zarr_format=3) 38 await hamt.make_read_only() # flush + freeze 39 root_node_id = hamt.root_node_id 40 print(root_node_id) 41 42 # --- read --- 43 hamt_ro = await HAMT.build( 44 cas, root_node_id=root_cid, read_only=True, values_are_bytes=True 45 ) 46 zhs_ro = ZarrHAMTStore(hamt_ro, read_only=True) 47 ds_ro = xarray.open_zarr(store=zhs_ro) 48 49 50 print(ds_ro) 51 xarray.testing.assert_identical(ds, ds_ro) 52 ``` 53 """ 54 55 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 56 """ 57 ### `hamt` and `read_only` 58 You need to make sure the following two things are true: 59 60 1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async. 61 2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations. 62 63 ##### A note about the zarr chunk separator, "/" vs "." 64 While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well. 65 66 #### Metadata Read Cache 67 `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data. 68 """ 69 super().__init__(read_only=read_only) 70 71 assert hamt.read_only == read_only 72 assert hamt.values_are_bytes 73 self.hamt: HAMT = hamt 74 """ 75 The internal HAMT. 76 Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID. 77 """ 78 79 self.metadata_read_cache: dict[str, bytes] = {} 80 """@private""" 81 82 @property 83 def read_only(self) -> bool: 84 """@private""" 85 return self.hamt.read_only 86 87 def __eq__(self, other: object) -> bool: 88 """@private""" 89 if not isinstance(other, ZarrHAMTStore): 90 return False 91 return self.hamt.root_node_id == other.hamt.root_node_id 92 93 async def get( 94 self, 95 key: str, 96 prototype: zarr.core.buffer.BufferPrototype, 97 byte_range: zarr.abc.store.ByteRequest | None = None, 98 ) -> zarr.core.buffer.Buffer | None: 99 """@private""" 100 try: 101 val: bytes 102 # do len check to avoid indexing into overly short strings, 3.12 does not throw errors but we dont know if other versions will 103 is_metadata: bool = ( 104 len(key) >= 9 and key[-9:] == "zarr.json" 105 ) # if path ends with zarr.json 106 107 if is_metadata and key in self.metadata_read_cache: 108 val = self.metadata_read_cache[key] 109 else: 110 val = cast( 111 bytes, await self.hamt.get(key) 112 ) # We know values received will always be bytes since we only store bytes in the HAMT 113 if is_metadata: 114 self.metadata_read_cache[key] = val 115 116 return prototype.buffer.from_bytes(val) 117 except KeyError: 118 # Sometimes zarr queries keys that don't exist anymore, just return nothing on those cases 119 return None 120 121 async def get_partial_values( 122 self, 123 prototype: zarr.core.buffer.BufferPrototype, 124 key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], 125 ) -> list[zarr.core.buffer.Buffer | None]: 126 """@private""" 127 raise NotImplementedError 128 129 async def exists(self, key: str) -> bool: 130 """@private""" 131 try: 132 await self.hamt.get(key) 133 return True 134 except KeyError: 135 return False 136 137 @property 138 def supports_writes(self) -> bool: 139 """@private""" 140 return not self.hamt.read_only 141 142 @property 143 def supports_partial_writes(self) -> bool: 144 """@private""" 145 return False 146 147 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 148 """@private""" 149 if key in self.metadata_read_cache: 150 self.metadata_read_cache[key] = value.to_bytes() 151 await self.hamt.set(key, value.to_bytes()) 152 153 async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None: 154 """@private""" 155 if not (await self.exists(key)): 156 await self.set(key, value) 157 158 async def set_partial_values( 159 self, key_start_values: Iterable[tuple[str, int, BytesLike]] 160 ) -> None: 161 """@private""" 162 raise NotImplementedError 163 164 @property 165 def supports_deletes(self) -> bool: 166 """@private""" 167 return not self.hamt.read_only 168 169 async def delete(self, key: str) -> None: 170 """@private""" 171 try: 172 await self.hamt.delete(key) 173 # In practice these lines never seem to be needed, creating and appending data are the only operations most zarrs actually undergo 174 # if key in self.metadata_read_cache: 175 # del self.metadata_read_cache[key] 176 # It's fine if the key was not in the HAMT 177 # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason, probably concurrency issues 178 except KeyError: 179 return 180 181 @property 182 def supports_listing(self) -> bool: 183 """@private""" 184 return True 185 186 async def list(self) -> AsyncIterator[str]: 187 """@private""" 188 async for key in self.hamt.keys(): 189 yield key 190 191 async def list_prefix(self, prefix: str) -> AsyncIterator[str]: 192 """@private""" 193 async for key in self.hamt.keys(): 194 if key.startswith(prefix): 195 yield key 196 197 async def list_dir(self, prefix: str) -> AsyncIterator[str]: 198 """ 199 @private 200 List *immediate* children that live directly under **prefix**. 201 202 This is similar to :py:meth:`list_prefix` but collapses everything 203 below the first ``"/"`` after *prefix*. Each child name is yielded 204 **exactly once** in the order of first appearance while scanning the 205 HAMT keys. 206 207 Parameters 208 ---------- 209 prefix : str 210 Logical directory path. *Must* end with ``"/"`` for the result to 211 make sense (e.g. ``"a/b/"``). 212 213 Yields 214 ------ 215 str 216 The name of each direct child (file or sub-directory) of *prefix*. 217 218 Examples 219 -------- 220 With keys :: 221 222 a/b/c/d 223 a/b/c/e 224 a/b/f 225 a/b/g/h/i 226 227 ``await list_dir("a/b/")`` produces :: 228 229 c 230 f 231 g 232 233 Notes 234 ----- 235 • Internally uses a :class:`set` to deduplicate names; memory grows 236 with the number of *unique* children, not the total number of keys. 237 • Order is **not** sorted; it reflects the first encounter while 238 iterating over :py:meth:`HAMT.keys`. 239 """ 240 seen_names: set[str] = set() 241 async for key in self.hamt.keys(): 242 if key.startswith(prefix): 243 suffix: str = key[len(prefix) :] 244 first_slash: int = suffix.find("/") 245 if first_slash == -1: 246 if suffix not in seen_names: 247 seen_names.add(suffix) 248 yield suffix 249 else: 250 name: str = suffix[0:first_slash] 251 if name not in seen_names: 252 seen_names.add(name) 253 yield name
Write and read Zarr v3s with a HAMT.
Read or write a Zarr-v3 store whose key/value pairs live inside a py-hamt mapping.
Keys are stored verbatim ("temp/c/0/0/0"
→ same string in HAMT) and
the value is the raw byte payload produced by Zarr. No additional
framing, compression, or encryption is applied by this class. For a zarr encryption example
see where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
For a fully encrypted zarr store, where metadata is not available, please see
SimpleEncryptedZarrHAMTStore
but we do not recommend using it.
A note about using the same ZarrHAMTStore
for writing and then reading again
If you write a Zarr to a HAMT, and then change it to read only mode, it's best to reinitialize a new ZarrHAMTStore with the proper read only setting. This is because this class, to err on the safe side, will not touch its super class's settings.
Sample Code
# --- Write ---
ds: xarray.Dataset = # ...
cas: ContentAddressedStore = # ...
hamt: HAMT = # ... make sure values_are_bytes is True and read_only is False to enable writes
hamt = await HAMT.build(cas, values_are_bytes=True) # write-enabled
zhs = ZarrHAMTStore(hamt, read_only=False)
ds.to_zarr(store=zhs, mode="w", zarr_format=3)
await hamt.make_read_only() # flush + freeze
root_node_id = hamt.root_node_id
print(root_node_id)
# --- read ---
hamt_ro = await HAMT.build(
cas, root_node_id=root_cid, read_only=True, values_are_bytes=True
)
zhs_ro = ZarrHAMTStore(hamt_ro, read_only=True)
ds_ro = xarray.open_zarr(store=zhs_ro)
print(ds_ro)
xarray.testing.assert_identical(ds, ds_ro)
55 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 56 """ 57 ### `hamt` and `read_only` 58 You need to make sure the following two things are true: 59 60 1. The HAMT is in the same read only mode that you are passing into the Zarr store. This means that `hamt.read_only == read_only`. This is because making a HAMT read only automatically requires async operations, but `__init__` cannot be async. 61 2. The HAMT has `hamt.values_are_bytes == True`. This improves efficiency with Zarr v3 operations. 62 63 ##### A note about the zarr chunk separator, "/" vs "." 64 While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well. 65 66 #### Metadata Read Cache 67 `ZarrHAMTStore` has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data. 68 """ 69 super().__init__(read_only=read_only) 70 71 assert hamt.read_only == read_only 72 assert hamt.values_are_bytes 73 self.hamt: HAMT = hamt 74 """ 75 The internal HAMT. 76 Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID. 77 """ 78 79 self.metadata_read_cache: dict[str, bytes] = {} 80 """@private"""
hamt
and read_only
You need to make sure the following two things are true:
- The HAMT is in the same read only mode that you are passing into the Zarr store. This means that
hamt.read_only == read_only
. This is because making a HAMT read only automatically requires async operations, but__init__
cannot be async. - The HAMT has
hamt.values_are_bytes == True
. This improves efficiency with Zarr v3 operations.
A note about the zarr chunk separator, "/" vs "."
While Zarr v2 used periods by default, Zarr v3 uses forward slashes, and that is assumed here as well.
Metadata Read Cache
ZarrHAMTStore
has an internal read cache for metadata. In practice metadata "zarr.json" files are very very frequently and duplicately requested compared to all other keys, and there are significant speed improvements gotten by implementing this cache. In terms of memory management, in practice this cache does not need an eviction step since "zarr.json" files are much smaller than the memory requirement of the zarr data.
The internal HAMT. Once done with write operations, the hamt can be set to read only mode as usual to get your root node ID.
13class SimpleEncryptedZarrHAMTStore(ZarrHAMTStore): 14 """ 15 Write and read Zarr v3s with a HAMT, encrypting *everything* for maximum privacy. 16 17 This store uses ChaCha20-Poly1305 to encrypt every single key-value pair 18 stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.) 19 and data chunks. This provides strong privacy but means the Zarr store is 20 completely opaque and unusable without the correct encryption key and header. 21 22 Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb 23 24 #### Encryption Details 25 - Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce). 26 - Requires a 32-byte encryption key and a header. 27 - Each encrypted value includes a 24-byte nonce and a 16-byte tag. 28 29 #### Important Considerations 30 - Since metadata is encrypted, standard Zarr tools cannot inspect the 31 dataset without prior decryption using this store class. 32 - There is no support for partial encryption or excluding variables. 33 - There is no metadata caching. 34 35 #### Sample Code 36 ```python 37 import xarray 38 from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar 39 from Crypto.Random import get_random_bytes 40 import numpy as np 41 42 # Setup 43 ds = xarray.Dataset( 44 {"data": (("y", "x"), np.arange(12).reshape(3, 4))}, 45 coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]} 46 ) 47 cas = KuboCAS() # Example ContentAddressedStore 48 encryption_key = get_random_bytes(32) 49 header = b"fully-encrypted-zarr" 50 51 # --- Write --- 52 hamt_write = await HAMT.build(cas=cas, values_are_bytes=True) 53 ezhs_write = SimpleEncryptedZarrHAMTStore( 54 hamt_write, False, encryption_key, header 55 ) 56 print("Writing fully encrypted Zarr...") 57 ds.to_zarr(store=ezhs_write, mode="w") 58 await hamt_write.make_read_only() 59 root_node_id = hamt_write.root_node_id 60 print(f"Wrote Zarr with root: {root_node_id}") 61 62 # --- Read --- 63 hamt_read = await HAMT.build( 64 cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True 65 ) 66 ezhs_read = SimpleEncryptedZarrHAMTStore( 67 hamt_read, True, encryption_key, header 68 ) 69 print("\nReading fully encrypted Zarr...") 70 ds_read = xarray.open_zarr(store=ezhs_read) 71 print("Read back dataset:") 72 print(ds_read) 73 xarray.testing.assert_identical(ds, ds_read) 74 print("Read successful and data verified.") 75 76 # --- Read with wrong key (demonstrates failure) --- 77 wrong_key = get_random_bytes(32) 78 hamt_bad = await HAMT.build( 79 cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True 80 ) 81 ezhs_bad = SimpleEncryptedZarrHAMTStore( 82 hamt_bad, True, wrong_key, header 83 ) 84 print("\nAttempting to read with wrong key...") 85 try: 86 ds_bad = xarray.open_zarr(store=ezhs_bad) 87 print(ds_bad) 88 except Exception as e: 89 print(f"Failed to read as expected: {type(e).__name__} - {e}") 90 ``` 91 """ 92 93 def __init__( 94 self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes 95 ) -> None: 96 """ 97 Initializes the SimpleEncryptedZarrHAMTStore. 98 99 Args: 100 hamt: The HAMT instance for storage. Must have `values_are_bytes=True`. 101 Its `read_only` status must match the `read_only` argument. 102 read_only: If True, the store is in read-only mode. 103 encryption_key: A 32-byte key for ChaCha20-Poly1305. 104 header: A header (bytes) used as associated data in encryption. 105 """ 106 super().__init__(hamt, read_only=read_only) 107 108 if len(encryption_key) != 32: 109 raise ValueError("Encryption key must be exactly 32 bytes long.") 110 self.encryption_key = encryption_key 111 self.header = header 112 self.metadata_read_cache: dict[str, bytes] = {} 113 114 def _encrypt(self, val: bytes) -> bytes: 115 """Encrypts data using ChaCha20-Poly1305.""" 116 nonce = get_random_bytes(24) # XChaCha20 uses a 24-byte nonce 117 cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce) 118 cipher.update(self.header) 119 ciphertext, tag = cipher.encrypt_and_digest(val) 120 return nonce + tag + ciphertext 121 122 def _decrypt(self, val: bytes) -> bytes: 123 """Decrypts data using ChaCha20-Poly1305.""" 124 try: 125 # Extract nonce (24), tag (16), and ciphertext 126 nonce, tag, ciphertext = val[:24], val[24:40], val[40:] 127 cipher = ChaCha20_Poly1305.new(key=self.encryption_key, nonce=nonce) 128 cipher.update(self.header) 129 plaintext = cipher.decrypt_and_verify(ciphertext, tag) 130 return plaintext 131 except Exception as e: 132 # Catching a broad exception as various issues (key, tag, length) can occur. 133 raise ValueError( 134 "Decryption failed. Check key, header, or data integrity." 135 ) from e 136 137 def __eq__(self, other: object) -> bool: 138 """@private""" 139 if not isinstance(other, SimpleEncryptedZarrHAMTStore): 140 return False 141 return ( 142 self.hamt.root_node_id == other.hamt.root_node_id 143 and self.encryption_key == other.encryption_key 144 and self.header == other.header 145 ) 146 147 async def get( 148 self, 149 key: str, 150 prototype: zarr.core.buffer.BufferPrototype, 151 byte_range: zarr.abc.store.ByteRequest | None = None, 152 ) -> zarr.core.buffer.Buffer | None: 153 """@private""" 154 try: 155 decrypted_val: bytes 156 is_metadata: bool = ( 157 len(key) >= 9 and key[-9:] == "zarr.json" 158 ) # if path ends with zarr.json 159 160 if is_metadata and key in self.metadata_read_cache: 161 decrypted_val = self.metadata_read_cache[key] 162 else: 163 raw_val = cast( 164 bytes, await self.hamt.get(key) 165 ) # We know values received will always be bytes since we only store bytes in the HAMT 166 decrypted_val = self._decrypt(raw_val) 167 if is_metadata: 168 self.metadata_read_cache[key] = decrypted_val 169 return prototype.buffer.from_bytes(decrypted_val) 170 except KeyError: 171 return None 172 173 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 174 """@private""" 175 if self.read_only: 176 raise Exception("Cannot write to a read-only store.") 177 178 raw_bytes = value.to_bytes() 179 if key in self.metadata_read_cache: 180 self.metadata_read_cache[key] = raw_bytes 181 # Encrypt it 182 encrypted_bytes = self._encrypt(raw_bytes) 183 await self.hamt.set(key, encrypted_bytes)
Write and read Zarr v3s with a HAMT, encrypting everything for maximum privacy.
This store uses ChaCha20-Poly1305 to encrypt every single key-value pair
stored in the Zarr, including all metadata (`zarr.json`, `.zarray`, etc.)
and data chunks. This provides strong privacy but means the Zarr store is
completely opaque and unusable without the correct encryption key and header.
Note: For standard zarr encryption and decryption where metadata is available use the method in https://github.com/dClimate/jupyter-notebooks/blob/main/notebooks/202b%20-%20Encryption%20Example%20(Encryption%20with%20Zarr%20Codecs).ipynb
#### Encryption Details
- Uses XChaCha20_Poly1305 (via pycryptodome's ChaCha20_Poly1305 with a 24-byte nonce).
- Requires a 32-byte encryption key and a header.
- Each encrypted value includes a 24-byte nonce and a 16-byte tag.
#### Important Considerations
- Since metadata is encrypted, standard Zarr tools cannot inspect the
dataset without prior decryption using this store class.
- There is no support for partial encryption or excluding variables.
- There is no metadata caching.
#### Sample Code
import xarray
from py_hamt import HAMT, KuboCAS # Assuming an KuboCAS or similar
from Crypto.Random import get_random_bytes
import numpy as np
# Setup
ds = xarray.Dataset(
{"data": (("y", "x"), np.arange(12).reshape(3, 4))},
coords={"y": [1, 2, 3], "x": [10, 20, 30, 40]}
)
cas = KuboCAS() # Example ContentAddressedStore
encryption_key = get_random_bytes(32)
header = b"fully-encrypted-zarr"
# --- Write ---
hamt_write = await HAMT.build(cas=cas, values_are_bytes=True)
ezhs_write = SimpleEncryptedZarrHAMTStore(
hamt_write, False, encryption_key, header
)
print("Writing fully encrypted Zarr...")
ds.to_zarr(store=ezhs_write, mode="w")
await hamt_write.make_read_only()
root_node_id = hamt_write.root_node_id
print(f"Wrote Zarr with root: {root_node_id}")
# --- Read ---
hamt_read = await HAMT.build(
cas=cas, root_node_id=root_node_id, values_are_bytes=True, read_only=True
)
ezhs_read = SimpleEncryptedZarrHAMTStore(
hamt_read, True, encryption_key, header
)
print("
Reading fully encrypted Zarr...")
ds_read = xarray.open_zarr(store=ezhs_read)
print("Read back dataset:")
print(ds_read)
xarray.testing.assert_identical(ds, ds_read)
print("Read successful and data verified.")
# --- Read with wrong key (demonstrates failure) ---
wrong_key = get_random_bytes(32)
hamt_bad = await HAMT.build(
cas=cas, root_node_id=root_node_id, read_only=True, values_are_bytes=True
)
ezhs_bad = SimpleEncryptedZarrHAMTStore(
hamt_bad, True, wrong_key, header
)
print("
Attempting to read with wrong key...")
try:
ds_bad = xarray.open_zarr(store=ezhs_bad)
print(ds_bad)
except Exception as e:
print(f"Failed to read as expected: {type(e).__name__} - {e}")
93 def __init__( 94 self, hamt: HAMT, read_only: bool, encryption_key: bytes, header: bytes 95 ) -> None: 96 """ 97 Initializes the SimpleEncryptedZarrHAMTStore. 98 99 Args: 100 hamt: The HAMT instance for storage. Must have `values_are_bytes=True`. 101 Its `read_only` status must match the `read_only` argument. 102 read_only: If True, the store is in read-only mode. 103 encryption_key: A 32-byte key for ChaCha20-Poly1305. 104 header: A header (bytes) used as associated data in encryption. 105 """ 106 super().__init__(hamt, read_only=read_only) 107 108 if len(encryption_key) != 32: 109 raise ValueError("Encryption key must be exactly 32 bytes long.") 110 self.encryption_key = encryption_key 111 self.header = header 112 self.metadata_read_cache: dict[str, bytes] = {}
Initializes the SimpleEncryptedZarrHAMTStore.
Args:
hamt: The HAMT instance for storage. Must have values_are_bytes=True
.
Its read_only
status must match the read_only
argument.
read_only: If True, the store is in read-only mode.
encryption_key: A 32-byte key for ChaCha20-Poly1305.
header: A header (bytes) used as associated data in encryption.