py_hamt
1from .hamt import HAMT, blake3_hashfn 2from .store import Store, DictStore, IPFSStore 3from .zarr_encryption_transformers import create_zarr_encryption_transformers 4from .ipfszarr3 import IPFSZarr3 5 6__all__ = [ 7 "HAMT", 8 "blake3_hashfn", 9 "Store", 10 "DictStore", 11 "IPFSStore", 12 "create_zarr_encryption_transformers", 13 "IPFSZarr3", 14]
128class HAMT(MutableMapping): 129 """ 130 This HAMT presents a key value interface, like a python dictionary. The only limits are that keys can only be strings, and values can only be types amenable with [IPLDKind](https://dag-cbor.readthedocs.io/en/stable/api/dag_cbor.ipld.html#dag_cbor.ipld.IPLDKind). IPLDKind is a fairly flexible data model, but do note that integers are must be within the bounds of a signed 64-bit integer. 131 132 py-hamt uses blake3 with a 32 byte wide hash by default, but to bring your own, read more in the documentation of `blake3_hashfn`. 133 134 # Some notes about thread safety 135 136 Since modifying a HAMT changes all parts of the tree, due to reserializing and saving to the backing store, modificiations are not thread safe. Thus, we offer a read-only mode which allows for parallel accesses, or a thread safe write enabled mode. You can see what type it is from the `read_only` variable. HAMTs default to write enabled mode on creation. Calling mutating operations in read only mode will raise Exceptions. 137 138 In write enabled mode, all operations block and wait, so multiple threads can write to the HAMT but the operations will in reality happen one after the other. 139 140 # transformer_encode, transformer_decode 141 `transformer_encode` and `transformer_decode` are functions that are called right before sending raw data to the store, after being encoded with dag_cbor. They get access to the key that the value originally corresponded to. 142 143 HAMT also sometimes sends its internal data structure objects to the store, those will not pass through these functions. 144 145 # dunder method documentation 146 These are not generated by pdoc automatically so we are including their documentation here. 147 148 ## `__len__` 149 Total number of keys. Note that this will have to scan the entire tree in order to count, which can take a while depending on the speed of the store retrieval. 150 151 ## `__iter__` 152 Generator of all string keys. When initially called, this will freeze the root node and then start iteration, so subsequent sets and deletes that mutate will not be reflected in the keys returned in this iteration. 153 154 ## `__deepcopy__` 155 For use with the python copy module. This creates deep copies of the current HAMT. The only thing it does not copy over is the cache. 156 ```python 157 from copy import deepcopy 158 hamt = # ... 159 copy_hamt = deepcopy(hamt) 160 ``` 161 """ 162 163 store: Store 164 """@private""" 165 # Every call of this will look like self.hash_fn() and so the first argument will always take a self argument 166 hash_fn: Callable[[bytes], bytes] 167 """@private""" 168 # Only important for writing, when reading this will use buckets even if they are overly big 169 max_bucket_size: int 170 """@private""" 171 172 # Don't use the type alias here since this is exposed in the documentation 173 root_node_id: IPLDKind 174 """DO NOT modify this directly. 175 176 This is the ID that the store returns for the root node of the HAMT. This is exposed since it is sometimes useful to reconstruct other HAMTs later if using a persistent store. 177 178 This is really type IPLDKind, but the documentation generates this strange type instead since IPLDKind is a type union. 179 """ 180 181 lock: Lock 182 """ 183 @private 184 For use in multithreading 185 """ 186 187 read_only: bool 188 """ 189 DO NOT modify this directly. This is here for you to read and check. 190 191 You can modify the read status of a HAMT through the `make_read_only` or `enable_write` functions, so that the HAMT will block on making a change until all mutating operations are done. 192 """ 193 194 transformer_encode: None | Callable[[str, bytes], bytes] 195 """This function is called to transform the raw bytes of a value after it is encoded with dag_cbor but before it gets sent to the Store.""" 196 transformer_decode: None | Callable[[str, bytes], bytes] 197 """This function is called to transform the raw bytes of a value after it is retrieved from the store but before decoding with dag_cbor to python IPLDKind type objects.""" 198 199 cache: dict[StoreID, Node] 200 """@private""" 201 max_cache_size_bytes: int 202 """The maximum size of the internal cache of nodes. We expect that some Stores will be high latency, so it is useful to cache parts of the datastructure. 203 204 Set to 0 if you want no cache at all.""" 205 206 # We rely on python 3.7+'s dicts which keep insertion order of elements to do a basic LRU 207 def cache_eviction_lru(self): 208 while getsizeof(self.cache) > self.max_cache_size_bytes: 209 if len(self.cache) == 0: 210 return 211 stalest_node_id = next(iter(self.cache.keys())) 212 del self.cache[stalest_node_id] 213 214 def write_node(self, node: Node) -> IPLDKind: 215 """@private""" 216 node_id = self.store.save_dag_cbor(node.serialize()) 217 self.cache[node_id] = node 218 self.cache_eviction_lru() 219 return node_id 220 221 def read_node(self, node_id: IPLDKind) -> Node: 222 """@private""" 223 node: Node 224 # cache hit 225 if node_id in self.cache: 226 node = self.cache[node_id] 227 # Reinsert to put this key as the most recently used and last in the dict insertion order 228 del self.cache[node_id] 229 self.cache[node_id] = node 230 # cache miss 231 else: 232 node = Node.deserialize(self.store.load(node_id)) 233 self.cache[node_id] = node 234 self.cache_eviction_lru() 235 236 return node 237 238 def __init__( 239 self, 240 store: Store, 241 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 242 read_only: bool = False, 243 root_node_id: IPLDKind = None, 244 max_cache_size_bytes=10_000_000, # default to 10 megabytes 245 transformer_encode: None | Callable[[str, bytes], bytes] = None, 246 transformer_decode: None | Callable[[str, bytes], bytes] = None, 247 ): 248 self.store = store 249 self.hash_fn = hash_fn 250 251 self.cache = {} 252 self.max_cache_size_bytes = max_cache_size_bytes 253 254 self.max_bucket_size = 4 255 self.read_only = read_only 256 self.lock = Lock() 257 258 self.transformer_encode = transformer_encode 259 self.transformer_decode = transformer_decode 260 261 if root_node_id is None: 262 root_node = Node() 263 self.root_node_id = self.write_node(root_node) 264 else: 265 self.root_node_id = root_node_id 266 # Make sure the cache has our root node 267 self.read_node(self.root_node_id) 268 269 # dunder for the python deepcopy module 270 def __deepcopy__(self, memo) -> "HAMT": 271 if not self.read_only: 272 self.lock.acquire(blocking=True) 273 274 copy_hamt = HAMT( 275 store=self.store, 276 hash_fn=self.hash_fn, 277 read_only=self.read_only, 278 root_node_id=self.root_node_id, 279 transformer_encode=self.transformer_encode, 280 transformer_decode=self.transformer_decode, 281 ) 282 283 if not self.read_only: 284 self.lock.release() 285 286 return copy_hamt 287 288 def make_read_only(self): 289 """Disables all mutation of this HAMT. When enabled, the HAMT will throw errors if set or delete are called. When a HAMT is only in read only mode, it allows for safe multithreaded reads, increasing performance.""" 290 self.lock.acquire(blocking=True) 291 self.read_only = True 292 self.lock.release() 293 294 def enable_write(self): 295 self.lock.acquire(blocking=True) 296 self.read_only = False 297 self.lock.release() 298 299 def _reserialize_and_link(self, node_stack: list[tuple[Link, Node]]): 300 """ 301 For internal use 302 This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store 303 Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack 304 Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python 305 If a node ends up being empty, then it is deleted entirely, unless it is the root node 306 Modifies in place 307 """ 308 # Iterate in the reverse direction, imitating going deeper into a stack 309 for stack_index in range(len(node_stack) - 1, -1, -1): 310 old_store_id, node = node_stack[stack_index] 311 312 # If this node is empty, and its not the root node, then we can delete it entirely from the list 313 buckets = node.get_buckets() 314 links = node.get_links() 315 is_empty = buckets == {} and links == {} 316 is_not_root = stack_index > 0 317 318 if is_empty and is_not_root: 319 # Unlink from the rest of the tree 320 _, prev_node = node_stack[stack_index - 1] 321 prev_node._remove_link(old_store_id) 322 323 # Remove from our stack 324 node_stack.pop(stack_index) 325 else: 326 # Reserialize 327 new_store_id = self.write_node(node) 328 node_stack[stack_index] = (new_store_id, node) 329 330 # If this is not the last i.e. root node, we need to change the linking of the node prior in the list 331 if stack_index > 0: 332 _, prev_node = node_stack[stack_index - 1] 333 prev_node._replace_link(old_store_id, new_store_id) 334 335 def __setitem__(self, key_to_insert: str, val_to_insert: IPLDKind): 336 if self.read_only: 337 raise Exception("Cannot call set on a read only HAMT") 338 339 if not self.read_only: 340 self.lock.acquire(blocking=True) 341 342 val = dag_cbor.encode(val_to_insert) 343 if self.transformer_encode is not None: 344 val = self.transformer_encode(key_to_insert, val) 345 val_ptr = self.store.save_raw(val) 346 347 node_stack: list[tuple[Link, Node]] = [] 348 root_node = self.read_node(self.root_node_id) 349 node_stack.append((self.root_node_id, root_node)) 350 351 # FIFO queue to keep track of all the KVs we need to insert 352 # This is important for if any buckets overflow and thus we need to reinsert all those KVs 353 kvs_queue: list[tuple[str, IPLDKind]] = [] 354 kvs_queue.append((key_to_insert, val_ptr)) 355 356 created_change = False 357 # Keep iterating until we have no more KVs to insert 358 while len(kvs_queue) != 0: 359 _, top_node = node_stack[-1] 360 curr_key, curr_val = kvs_queue[0] 361 362 raw_hash = self.hash_fn(curr_key.encode()) 363 map_key = str(extract_bits(raw_hash, len(node_stack) - 1, 8)) 364 365 buckets = top_node.get_buckets() 366 links = top_node.get_links() 367 368 if map_key in links and map_key in buckets: 369 self.lock.release() 370 raise Exception( 371 "Key in both buckets and links of the node, invariant violated" 372 ) 373 elif map_key in links: 374 next_node_id = links[map_key] 375 next_node = self.read_node(next_node_id) 376 node_stack.append((next_node_id, next_node)) 377 elif map_key in buckets: 378 bucket = buckets[map_key] 379 created_change = True 380 381 # If this bucket already has this same key, just rewrite the value 382 # Since we can't use continues to go back to the top of the while loop, use this boolean flag instead 383 should_continue_at_while = False 384 for kv in bucket: 385 if curr_key in kv: 386 kv[curr_key] = curr_val 387 kvs_queue.pop(0) 388 should_continue_at_while = True 389 break 390 if should_continue_at_while: 391 continue 392 393 bucket_has_space = len(bucket) < self.max_bucket_size 394 if bucket_has_space: 395 bucket.append({curr_key: curr_val}) 396 kvs_queue.pop(0) 397 # If bucket is full and we need to add, then all these KVs need to be taken out of this bucket and reinserted throughout the tree 398 else: 399 # Empty the bucket of KVs into the queue 400 for kv in bucket: 401 for k, v in kv.items(): 402 kvs_queue.append((k, v)) 403 404 # Delete empty bucket, there should only be a link now 405 del buckets[map_key] 406 407 # Create a new link to a new node so that we can reflow these KVs into new buckets 408 new_node = Node() 409 new_node_id = self.write_node(new_node) 410 411 links[map_key] = new_node_id 412 413 # We need to rerun from the top with the new queue, but this time this node will have a link to put KVs deeper down in the tree 414 node_stack.append((new_node_id, new_node)) 415 else: 416 # If there is no link and no bucket, then we can create a new bucket, insert, and be done with this key 417 bucket: list[dict[str, IPLDKind]] = [] 418 bucket.append({curr_key: curr_val}) 419 kvs_queue.pop(0) 420 buckets[map_key] = bucket 421 created_change = True 422 423 # Finally, reserialize and fix all links, deleting empty nodes as needed 424 if created_change: 425 self._reserialize_and_link(node_stack) 426 node_stack_top_id = node_stack[0][0] 427 self.root_node_id = node_stack_top_id 428 429 if not self.read_only: 430 self.lock.release() 431 432 def __delitem__(self, key: str): 433 if self.read_only: 434 raise Exception("Cannot call delete on a read only HAMT") 435 436 if not self.read_only: 437 self.lock.acquire(blocking=True) 438 439 raw_hash = self.hash_fn(key.encode()) 440 441 node_stack: list[tuple[Link, Node]] = [] 442 root_node = self.read_node(self.root_node_id) 443 node_stack.append((self.root_node_id, root_node)) 444 445 created_change = False 446 while True: 447 top_id, top_node = node_stack[-1] 448 map_key = str(extract_bits(raw_hash, len(node_stack) - 1, 8)) 449 450 buckets = top_node.get_buckets() 451 links = top_node.get_links() 452 453 if map_key in buckets and map_key in links: 454 self.lock.release() 455 raise Exception( 456 "Key in both buckets and links of the node, invariant violated" 457 ) 458 elif map_key in buckets: 459 bucket = buckets[map_key] 460 461 # Delete from within this bucket 462 for bucket_index in range(len(bucket)): 463 kv = bucket[bucket_index] 464 if key in kv: 465 created_change = True 466 bucket.pop(bucket_index) 467 468 # If this bucket becomes empty then delete this dict entry for the bucket 469 if len(bucket) == 0: 470 del buckets[map_key] 471 472 # This must be done to avoid IndexErrors after continuing to iterate since the length of the bucket has now changed 473 break 474 475 break 476 elif map_key in links: 477 link = links[map_key] 478 next_node = self.read_node(link) 479 node_stack.append((link, next_node)) 480 continue 481 482 else: 483 # This key is not even in the HAMT so just exit 484 break 485 486 # Finally, reserialize and fix all links, deleting empty nodes as needed 487 if created_change: 488 self._reserialize_and_link(node_stack) 489 node_stack_top_id = node_stack[0][0] 490 self.root_node_id = node_stack_top_id 491 492 if not self.read_only: 493 self.lock.release() 494 495 if not created_change: 496 raise KeyError 497 498 def __getitem__(self, key: str) -> IPLDKind: 499 if not self.read_only: 500 self.lock.acquire(blocking=True) 501 502 raw_hash = self.hash_fn(key.encode()) 503 504 node_id_stack: list[Link] = [] 505 node_id_stack.append(self.root_node_id) 506 507 # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value the HAMT can store 508 result_ptr: IPLDKind = None 509 found_a_result: bool = False 510 while True: 511 top_id = node_id_stack[-1] 512 top_node = self.read_node(top_id) 513 map_key = str(extract_bits(raw_hash, len(node_id_stack) - 1, 8)) 514 515 # Check if this node is in one of the buckets 516 buckets = top_node.get_buckets() 517 if map_key in buckets: 518 bucket = buckets[map_key] 519 for kv in bucket: 520 if key in kv: 521 result_ptr = kv[key] 522 found_a_result = True 523 break 524 525 # If it isn't in one of the buckets, check if there's a link to another serialized node id 526 links = top_node.get_links() 527 if map_key in links: 528 link_id = links[map_key] 529 node_id_stack.append(link_id) 530 continue 531 # Nowhere left to go, stop walking down the tree 532 else: 533 break 534 535 if not self.read_only: 536 self.lock.release() 537 538 if not found_a_result: 539 raise KeyError 540 541 result_bytes = self.store.load(result_ptr) 542 if self.transformer_decode is not None: 543 result_bytes = self.transformer_decode(key, result_bytes) 544 return dag_cbor.decode(result_bytes) 545 546 def __len__(self) -> int: 547 key_count = 0 548 for _ in self: 549 key_count += 1 550 551 return key_count 552 553 def __iter__(self): 554 if not self.read_only: 555 self.lock.acquire(blocking=True) 556 557 node_id_stack = [] 558 node_id_stack.append(self.root_node_id) 559 560 if not self.read_only: 561 self.lock.release() 562 563 while True: 564 if len(node_id_stack) == 0: 565 break 566 567 top_id = node_id_stack.pop() 568 node = self.read_node(top_id) 569 570 # Collect all keys from all buckets 571 buckets = node.get_buckets() 572 for bucket in buckets.values(): 573 for kv in bucket: 574 for k in kv: 575 yield k 576 577 # Traverse down list of links 578 links = node.get_links() 579 for link in links.values(): 580 node_id_stack.append(link) 581 582 def ids(self): 583 """Generator of all IDs the backing store uses""" 584 if not self.read_only: 585 self.lock.acquire(blocking=True) 586 587 node_id_stack: list[Link] = [] 588 node_id_stack.append(self.root_node_id) 589 590 if not self.read_only: 591 self.lock.release() 592 593 while len(node_id_stack) > 0: 594 top_id = node_id_stack.pop() 595 yield top_id 596 top_node = self.read_node(top_id) 597 598 buckets = top_node.get_buckets() 599 for bucket in buckets.values(): 600 for kv in bucket: 601 for v in kv.values(): 602 yield v 603 604 # Traverse down list of ids that are the store's links' 605 links = top_node.get_links() 606 for link in links.values(): 607 node_id_stack.append(link)
This HAMT presents a key value interface, like a python dictionary. The only limits are that keys can only be strings, and values can only be types amenable with IPLDKind. IPLDKind is a fairly flexible data model, but do note that integers are must be within the bounds of a signed 64-bit integer.
py-hamt uses blake3 with a 32 byte wide hash by default, but to bring your own, read more in the documentation of blake3_hashfn
.
Some notes about thread safety
Since modifying a HAMT changes all parts of the tree, due to reserializing and saving to the backing store, modificiations are not thread safe. Thus, we offer a read-only mode which allows for parallel accesses, or a thread safe write enabled mode. You can see what type it is from the read_only
variable. HAMTs default to write enabled mode on creation. Calling mutating operations in read only mode will raise Exceptions.
In write enabled mode, all operations block and wait, so multiple threads can write to the HAMT but the operations will in reality happen one after the other.
transformer_encode, transformer_decode
transformer_encode
and transformer_decode
are functions that are called right before sending raw data to the store, after being encoded with dag_cbor. They get access to the key that the value originally corresponded to.
HAMT also sometimes sends its internal data structure objects to the store, those will not pass through these functions.
dunder method documentation
These are not generated by pdoc automatically so we are including their documentation here.
__len__
Total number of keys. Note that this will have to scan the entire tree in order to count, which can take a while depending on the speed of the store retrieval.
__iter__
Generator of all string keys. When initially called, this will freeze the root node and then start iteration, so subsequent sets and deletes that mutate will not be reflected in the keys returned in this iteration.
__deepcopy__
For use with the python copy module. This creates deep copies of the current HAMT. The only thing it does not copy over is the cache.
from copy import deepcopy
hamt = # ...
copy_hamt = deepcopy(hamt)
238 def __init__( 239 self, 240 store: Store, 241 hash_fn: Callable[[bytes], bytes] = blake3_hashfn, 242 read_only: bool = False, 243 root_node_id: IPLDKind = None, 244 max_cache_size_bytes=10_000_000, # default to 10 megabytes 245 transformer_encode: None | Callable[[str, bytes], bytes] = None, 246 transformer_decode: None | Callable[[str, bytes], bytes] = None, 247 ): 248 self.store = store 249 self.hash_fn = hash_fn 250 251 self.cache = {} 252 self.max_cache_size_bytes = max_cache_size_bytes 253 254 self.max_bucket_size = 4 255 self.read_only = read_only 256 self.lock = Lock() 257 258 self.transformer_encode = transformer_encode 259 self.transformer_decode = transformer_decode 260 261 if root_node_id is None: 262 root_node = Node() 263 self.root_node_id = self.write_node(root_node) 264 else: 265 self.root_node_id = root_node_id 266 # Make sure the cache has our root node 267 self.read_node(self.root_node_id)
DO NOT modify this directly.
This is the ID that the store returns for the root node of the HAMT. This is exposed since it is sometimes useful to reconstruct other HAMTs later if using a persistent store.
This is really type IPLDKind, but the documentation generates this strange type instead since IPLDKind is a type union.
DO NOT modify this directly. This is here for you to read and check.
You can modify the read status of a HAMT through the make_read_only
or enable_write
functions, so that the HAMT will block on making a change until all mutating operations are done.
This function is called to transform the raw bytes of a value after it is encoded with dag_cbor but before it gets sent to the Store.
This function is called to transform the raw bytes of a value after it is retrieved from the store but before decoding with dag_cbor to python IPLDKind type objects.
The maximum size of the internal cache of nodes. We expect that some Stores will be high latency, so it is useful to cache parts of the datastructure.
Set to 0 if you want no cache at all.
288 def make_read_only(self): 289 """Disables all mutation of this HAMT. When enabled, the HAMT will throw errors if set or delete are called. When a HAMT is only in read only mode, it allows for safe multithreaded reads, increasing performance.""" 290 self.lock.acquire(blocking=True) 291 self.read_only = True 292 self.lock.release()
Disables all mutation of this HAMT. When enabled, the HAMT will throw errors if set or delete are called. When a HAMT is only in read only mode, it allows for safe multithreaded reads, increasing performance.
582 def ids(self): 583 """Generator of all IDs the backing store uses""" 584 if not self.read_only: 585 self.lock.acquire(blocking=True) 586 587 node_id_stack: list[Link] = [] 588 node_id_stack.append(self.root_node_id) 589 590 if not self.read_only: 591 self.lock.release() 592 593 while len(node_id_stack) > 0: 594 top_id = node_id_stack.pop() 595 yield top_id 596 top_node = self.read_node(top_id) 597 598 buckets = top_node.get_buckets() 599 for bucket in buckets.values(): 600 for kv in bucket: 601 for v in kv.values(): 602 yield v 603 604 # Traverse down list of ids that are the store's links' 605 links = top_node.get_links() 606 for link in links.values(): 607 node_id_stack.append(link)
Generator of all IDs the backing store uses
114def blake3_hashfn(input_bytes: bytes) -> bytes: 115 """ 116 This is provided as a default recommended hash function by py-hamt. It uses the blake3 hash function and uses 32 bytes as the hash size. 117 118 To bring your own hash function, just create a function that takes in bytes and returns the hash bytes, and use that in the HAMT init method. 119 120 It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits. 121 """ 122 # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify 123 digest = b3.digest(input_bytes, size=32) 124 raw_bytes = b3.unwrap(digest) 125 return raw_bytes
This is provided as a default recommended hash function by py-hamt. It uses the blake3 hash function and uses 32 bytes as the hash size.
To bring your own hash function, just create a function that takes in bytes and returns the hash bytes, and use that in the HAMT init method.
It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
11class Store(ABC): 12 """Abstract class that represents a storage mechanism the HAMT can use for keeping data. 13 14 The return type of save and input to load is really type IPLDKind, but the documentation generates something a bit strange since IPLDKind is a type union. 15 """ 16 17 @abstractmethod 18 def save_raw(self, data: bytes) -> IPLDKind: 19 """Take any set of bytes, save it to the storage mechanism, and return an ID in the type of IPLDKind that can be used to retrieve those bytes later.""" 20 21 @abstractmethod 22 def save_dag_cbor(self, data: bytes) -> IPLDKind: 23 """Take a set of bytes and save it just like `save_raw`, except this method has additional context that the data is in a dag-cbor format.""" 24 25 @abstractmethod 26 def load(self, id: IPLDKind) -> bytes: 27 """Retrieve the bytes based on an ID returned earlier by the save function."""
Abstract class that represents a storage mechanism the HAMT can use for keeping data.
The return type of save and input to load is really type IPLDKind, but the documentation generates something a bit strange since IPLDKind is a type union.
17 @abstractmethod 18 def save_raw(self, data: bytes) -> IPLDKind: 19 """Take any set of bytes, save it to the storage mechanism, and return an ID in the type of IPLDKind that can be used to retrieve those bytes later."""
Take any set of bytes, save it to the storage mechanism, and return an ID in the type of IPLDKind that can be used to retrieve those bytes later.
21 @abstractmethod 22 def save_dag_cbor(self, data: bytes) -> IPLDKind: 23 """Take a set of bytes and save it just like `save_raw`, except this method has additional context that the data is in a dag-cbor format."""
Take a set of bytes and save it just like save_raw
, except this method has additional context that the data is in a dag-cbor format.
25 @abstractmethod 26 def load(self, id: IPLDKind) -> bytes: 27 """Retrieve the bytes based on an ID returned earlier by the save function."""
Retrieve the bytes based on an ID returned earlier by the save function.
31class DictStore(Store): 32 """A basic implementation of a backing store, mostly for demonstration and testing purposes. It hashes all inputs and uses that as a key to an in-memory python dict. The hash bytes are the ID that `save` returns and `load` takes in.""" 33 34 store: dict[bytes, bytes] 35 """@private""" 36 hash_alg: Multihash 37 """@private""" 38 39 def __init__(self): 40 self.store = {} 41 self.hash_alg = multihash.get("blake3") 42 43 def save(self, data: bytes) -> bytes: 44 hash = self.hash_alg.digest(data, size=32) 45 self.store[hash] = data 46 return hash 47 48 def save_raw(self, data: bytes) -> bytes: 49 """""" 50 return self.save(data) 51 52 def save_dag_cbor(self, data: bytes) -> bytes: 53 """""" 54 return self.save(data) 55 56 # Ignore the type error since bytes is in the IPLDKind type 57 def load(self, id: bytes) -> bytes: # type: ignore 58 """""" 59 if id in self.store: 60 return self.store[id] 61 else: 62 raise Exception("ID not found in store")
A basic implementation of a backing store, mostly for demonstration and testing purposes. It hashes all inputs and uses that as a key to an in-memory python dict. The hash bytes are the ID that save
returns and load
takes in.
65class IPFSStore(Store): 66 """ 67 Use IPFS as a backing store for a HAMT. The IDs returned from save and used by load are IPFS CIDs. 68 69 Save methods use the RPC API but `load` uses the HTTP Gateway, so read-only HAMTs will only access the HTTP Gateway. This allows for connection to remote gateways as well. 70 71 You can write to an authenticated IPFS node by providing credentials in the constructor. The following authentication methods are supported: 72 - Basic Authentication: Provide a tuple of (username, password) to the `basic_auth` parameter. 73 - Bearer Token: Provide a bearer token to the `bearer_token` parameter. 74 - API Key: Provide an API key to the `api_key` parameter. You can customize the header name for the API key by setting the `api_key_header` parameter. 75 """ 76 77 def __init__( 78 self, 79 timeout_seconds: int = 30, 80 gateway_uri_stem: str = "http://127.0.0.1:8080", 81 rpc_uri_stem: str = "http://127.0.0.1:5001", 82 hasher: str = "blake3", 83 pin_on_add: bool = False, 84 debug: bool = False, 85 # Authentication parameters 86 basic_auth: tuple[str, str] | None = None, # (username, password) 87 bearer_token: str | None = None, 88 api_key: str | None = None, 89 api_key_header: str = "X-API-Key", # Customizable API key header 90 ): 91 self.timeout_seconds = timeout_seconds 92 """ 93 You can modify this variable directly if you choose. 94 95 This sets the timeout in seconds for all HTTP requests. 96 """ 97 self.gateway_uri_stem = gateway_uri_stem 98 """ 99 URI stem of the IPFS HTTP gateway that IPFSStore will retrieve blocks from. 100 """ 101 self.rpc_uri_stem = rpc_uri_stem 102 """URI Stem of the IPFS RPC API that IPFSStore will send data to.""" 103 self.hasher = hasher 104 """The hash function to send to IPFS when storing bytes.""" 105 self.pin_on_add: bool = pin_on_add 106 """Whether IPFSStore should tell the daemon to pin the generated CIDs in API calls. This can be changed in between usage, but should be kept the same value for the lifetime of the program.""" 107 self.debug: bool = debug 108 """If true, this records the total number of bytes sent in and out of IPFSStore to the network. You can access this information in `total_sent` and `total_received`. Bytes are counted in terms of either how much was sent to IPFS to store a CID, or how much data was inside of a retrieved IPFS block. This does not include the overhead of the HTTP requests themselves.""" 109 self.total_sent: None | int = 0 if debug else None 110 """Total bytes sent to IPFS. Used for debugging purposes.""" 111 self.total_received: None | int = 0 if debug else None 112 """Total bytes in responses from IPFS for blocks. Used for debugging purposes.""" 113 114 # Authentication settings 115 self.basic_auth = basic_auth 116 """Tuple of (username, password) for Basic Authentication""" 117 self.bearer_token = bearer_token 118 """Bearer token for token-based authentication""" 119 self.api_key = api_key 120 """API key for API key-based authentication""" 121 self.api_key_header = api_key_header 122 """Header name to use for API key authentication""" 123 124 def save(self, data: bytes, cid_codec: str) -> CID: 125 """ 126 This saves the data to an ipfs daemon by calling the RPC API, and then returns the CID, with a multicodec set by the input cid_codec. We need to do this since the API always returns either a multicodec of raw or dag-pb if it had to shard the input data. 127 128 By default, `save` pins content it adds. 129 130 ```python 131 from py_hamt import IPFSStore 132 133 ipfs_store = IPFSStore() 134 cid = ipfs_store.save("foo".encode(), "raw") 135 print(cid.human_readable) 136 ``` 137 """ 138 pin_string: str = "true" if self.pin_on_add else "false" 139 140 # Apply authentication based on provided credentials 141 headers = {} 142 if self.bearer_token: 143 headers["Authorization"] = f"Bearer {self.bearer_token}" 144 elif self.api_key: 145 headers[self.api_key_header] = self.api_key 146 147 # Prepare request parameters 148 url = f"{self.rpc_uri_stem}/api/v0/add?hash={self.hasher}&pin={pin_string}" 149 150 # Make the request with appropriate authentication 151 response = requests.post( 152 url, 153 files={"file": data}, 154 headers=headers, 155 auth=self.basic_auth, 156 timeout=self.timeout_seconds, 157 ) 158 response.raise_for_status() 159 160 cid_str: str = json.decode(response.content)["Hash"] # type: ignore 161 cid = CID.decode(cid_str) 162 # If it's dag-pb it means we should not reset the cid codec, since this is a UnixFS entry for a large amount of data that thus had to be sharded 163 # We don't worry about HAMT nodes being larger than 1 MB 164 # with a conservative calculation of 256 map keys * 10 (bucket size of 9 and 1 link per map key)*100 bytes huge size for a cid=0.256 MB, so we can always safely recodec those as dag-cbor, which is what they are 165 # 0x70 means dag-pb 166 if cid.codec.code != 0x70: 167 cid = cid.set(codec=cid_codec) 168 169 # if everything is succesful, record debug information 170 if self.debug: 171 self.total_sent += len(data) # type: ignore 172 173 return cid 174 175 def save_raw(self, data: bytes) -> CID: 176 """See `save`""" 177 return self.save(data, "raw") 178 179 def save_dag_cbor(self, data: bytes) -> CID: 180 """See `save`""" 181 return self.save(data, "dag-cbor") 182 183 # Ignore the type error since CID is in the IPLDKind type 184 def load(self, id: CID) -> bytes: # type: ignore 185 """ 186 This retrieves the raw bytes by calling the provided HTTP gateway. 187 ```python 188 from py_hamt import IPFSStore 189 from multiformats import CID 190 191 ipfs_store = IPFSStore() 192 # This is just an example CID 193 cid = CID.decode("bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze") 194 data = ipfs_store.load(cid) 195 print(data) 196 ``` 197 """ 198 response = requests.get( 199 f"{self.gateway_uri_stem}/ipfs/{str(id)}", timeout=self.timeout_seconds 200 ) 201 response.raise_for_status() 202 203 if self.debug: 204 self.total_received += len(response.content) # type: ignore 205 206 return response.content
Use IPFS as a backing store for a HAMT. The IDs returned from save and used by load are IPFS CIDs.
Save methods use the RPC API but load
uses the HTTP Gateway, so read-only HAMTs will only access the HTTP Gateway. This allows for connection to remote gateways as well.
You can write to an authenticated IPFS node by providing credentials in the constructor. The following authentication methods are supported:
- Basic Authentication: Provide a tuple of (username, password) to the
basic_auth
parameter. - Bearer Token: Provide a bearer token to the
bearer_token
parameter. - API Key: Provide an API key to the
api_key
parameter. You can customize the header name for the API key by setting theapi_key_header
parameter.
77 def __init__( 78 self, 79 timeout_seconds: int = 30, 80 gateway_uri_stem: str = "http://127.0.0.1:8080", 81 rpc_uri_stem: str = "http://127.0.0.1:5001", 82 hasher: str = "blake3", 83 pin_on_add: bool = False, 84 debug: bool = False, 85 # Authentication parameters 86 basic_auth: tuple[str, str] | None = None, # (username, password) 87 bearer_token: str | None = None, 88 api_key: str | None = None, 89 api_key_header: str = "X-API-Key", # Customizable API key header 90 ): 91 self.timeout_seconds = timeout_seconds 92 """ 93 You can modify this variable directly if you choose. 94 95 This sets the timeout in seconds for all HTTP requests. 96 """ 97 self.gateway_uri_stem = gateway_uri_stem 98 """ 99 URI stem of the IPFS HTTP gateway that IPFSStore will retrieve blocks from. 100 """ 101 self.rpc_uri_stem = rpc_uri_stem 102 """URI Stem of the IPFS RPC API that IPFSStore will send data to.""" 103 self.hasher = hasher 104 """The hash function to send to IPFS when storing bytes.""" 105 self.pin_on_add: bool = pin_on_add 106 """Whether IPFSStore should tell the daemon to pin the generated CIDs in API calls. This can be changed in between usage, but should be kept the same value for the lifetime of the program.""" 107 self.debug: bool = debug 108 """If true, this records the total number of bytes sent in and out of IPFSStore to the network. You can access this information in `total_sent` and `total_received`. Bytes are counted in terms of either how much was sent to IPFS to store a CID, or how much data was inside of a retrieved IPFS block. This does not include the overhead of the HTTP requests themselves.""" 109 self.total_sent: None | int = 0 if debug else None 110 """Total bytes sent to IPFS. Used for debugging purposes.""" 111 self.total_received: None | int = 0 if debug else None 112 """Total bytes in responses from IPFS for blocks. Used for debugging purposes.""" 113 114 # Authentication settings 115 self.basic_auth = basic_auth 116 """Tuple of (username, password) for Basic Authentication""" 117 self.bearer_token = bearer_token 118 """Bearer token for token-based authentication""" 119 self.api_key = api_key 120 """API key for API key-based authentication""" 121 self.api_key_header = api_key_header 122 """Header name to use for API key authentication"""
You can modify this variable directly if you choose.
This sets the timeout in seconds for all HTTP requests.
Whether IPFSStore should tell the daemon to pin the generated CIDs in API calls. This can be changed in between usage, but should be kept the same value for the lifetime of the program.
If true, this records the total number of bytes sent in and out of IPFSStore to the network. You can access this information in total_sent
and total_received
. Bytes are counted in terms of either how much was sent to IPFS to store a CID, or how much data was inside of a retrieved IPFS block. This does not include the overhead of the HTTP requests themselves.
Total bytes in responses from IPFS for blocks. Used for debugging purposes.
124 def save(self, data: bytes, cid_codec: str) -> CID: 125 """ 126 This saves the data to an ipfs daemon by calling the RPC API, and then returns the CID, with a multicodec set by the input cid_codec. We need to do this since the API always returns either a multicodec of raw or dag-pb if it had to shard the input data. 127 128 By default, `save` pins content it adds. 129 130 ```python 131 from py_hamt import IPFSStore 132 133 ipfs_store = IPFSStore() 134 cid = ipfs_store.save("foo".encode(), "raw") 135 print(cid.human_readable) 136 ``` 137 """ 138 pin_string: str = "true" if self.pin_on_add else "false" 139 140 # Apply authentication based on provided credentials 141 headers = {} 142 if self.bearer_token: 143 headers["Authorization"] = f"Bearer {self.bearer_token}" 144 elif self.api_key: 145 headers[self.api_key_header] = self.api_key 146 147 # Prepare request parameters 148 url = f"{self.rpc_uri_stem}/api/v0/add?hash={self.hasher}&pin={pin_string}" 149 150 # Make the request with appropriate authentication 151 response = requests.post( 152 url, 153 files={"file": data}, 154 headers=headers, 155 auth=self.basic_auth, 156 timeout=self.timeout_seconds, 157 ) 158 response.raise_for_status() 159 160 cid_str: str = json.decode(response.content)["Hash"] # type: ignore 161 cid = CID.decode(cid_str) 162 # If it's dag-pb it means we should not reset the cid codec, since this is a UnixFS entry for a large amount of data that thus had to be sharded 163 # We don't worry about HAMT nodes being larger than 1 MB 164 # with a conservative calculation of 256 map keys * 10 (bucket size of 9 and 1 link per map key)*100 bytes huge size for a cid=0.256 MB, so we can always safely recodec those as dag-cbor, which is what they are 165 # 0x70 means dag-pb 166 if cid.codec.code != 0x70: 167 cid = cid.set(codec=cid_codec) 168 169 # if everything is succesful, record debug information 170 if self.debug: 171 self.total_sent += len(data) # type: ignore 172 173 return cid
This saves the data to an ipfs daemon by calling the RPC API, and then returns the CID, with a multicodec set by the input cid_codec. We need to do this since the API always returns either a multicodec of raw or dag-pb if it had to shard the input data.
By default, save
pins content it adds.
from py_hamt import IPFSStore
ipfs_store = IPFSStore()
cid = ipfs_store.save("foo".encode(), "raw")
print(cid.human_readable)
179 def save_dag_cbor(self, data: bytes) -> CID: 180 """See `save`""" 181 return self.save(data, "dag-cbor")
See save
184 def load(self, id: CID) -> bytes: # type: ignore 185 """ 186 This retrieves the raw bytes by calling the provided HTTP gateway. 187 ```python 188 from py_hamt import IPFSStore 189 from multiformats import CID 190 191 ipfs_store = IPFSStore() 192 # This is just an example CID 193 cid = CID.decode("bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze") 194 data = ipfs_store.load(cid) 195 print(data) 196 ``` 197 """ 198 response = requests.get( 199 f"{self.gateway_uri_stem}/ipfs/{str(id)}", timeout=self.timeout_seconds 200 ) 201 response.raise_for_status() 202 203 if self.debug: 204 self.total_received += len(response.content) # type: ignore 205 206 return response.content
This retrieves the raw bytes by calling the provided HTTP gateway.
from py_hamt import IPFSStore
from multiformats import CID
ipfs_store = IPFSStore()
# This is just an example CID
cid = CID.decode("bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze")
data = ipfs_store.load(cid)
print(data)
11def create_zarr_encryption_transformers( 12 encryption_key: bytes, 13 header: bytes, 14 exclude_vars: list[str] = [], 15) -> tuple[TransformerFN, TransformerFN]: 16 """ 17 Uses XChaCha20_Poly1305 from the pycryptodome library to perform encryption, while ignoring zarr metadata files. 18 19 https://pycryptodome.readthedocs.io/en/latest/src/cipher/chacha20_poly1305.html 20 21 Note that the encryption key must always be 32 bytes long. A header is required by the underlying encryption algorithm. Every time a zarr chunk is encrypted, a random 24-byte nonce is generated. This is saved with the chunk for use when reading back. 22 23 zarr.json metadata files in a zarr v3 are always ignored, to allow for calculating an encrypted zarr's structure without having the encryption key. 24 25 With `exclude_vars` you may also set some variables to be unencrypted. This allows for partially encrypted zarrs which can be loaded into xarray but the values of encrypted variables cannot be accessed (errors will be thrown). You should generally include your coordinate variables along with your data variables in here. 26 27 # Example code 28 ```python 29 from py_hamt import HAMT, IPFSStore, IPFSZarr3 30 31 ds = ... # example xarray Dataset with precip and temp data variables 32 encryption_key = bytes(32) # change before using, only for demonstration purposes! 33 header = "sample-header".encode() 34 encrypt, decrypt = create_zarr_encryption_transformers( 35 encryption_key, header, exclude_vars=["temp"] 36 ) 37 hamt = HAMT( 38 store=IPFSStore(), transformer_encode=encrypt, transformer_decode=decrypt 39 ) 40 ipfszarr3 = IPFSZarr3(hamt) 41 ds.to_zarr(store=ipfszarr3, mode="w") 42 43 print("Attempting to read and print metadata of partially encrypted zarr") 44 enc_ds = xr.open_zarr(store=ipfszarr3, read_only=True) 45 print(enc_ds) 46 assert enc_ds.temp.sum() == ds.temp.sum() 47 try: 48 enc_ds.precip.sum() 49 except: 50 print("Couldn't read encrypted variable") 51 ``` 52 """ 53 54 if len(encryption_key) != 32: 55 raise ValueError("Encryption key is not 32 bytes") 56 57 def _should_transform(key: str) -> bool: 58 p = Path(key) 59 60 # Find the first directory name in the path since zarr v3 chunks are stored in a nested directory structure 61 # e.g. for Path("precip/c/0/0/1") it would return "precip" 62 if p.parts[0] in exclude_vars: 63 return False 64 65 # Don't transform metadata files 66 if p.name == "zarr.json": 67 return False 68 69 return True 70 71 def encrypt(key: str, val: bytes) -> bytes: 72 if not _should_transform(key): 73 return val 74 75 nonce = get_random_bytes(24) 76 cipher = ChaCha20_Poly1305.new(key=encryption_key, nonce=nonce) 77 cipher.update(header) 78 ciphertext, tag = cipher.encrypt_and_digest(val) 79 # + concatenates two byte variables x,y so that it looks like xy 80 return nonce + tag + ciphertext 81 82 def decrypt(key: str, val: bytes) -> bytes: 83 if not _should_transform(key): 84 return val 85 86 nonce, tag, ciphertext = val[:24], val[24:40], val[40:] 87 cipher = ChaCha20_Poly1305.new(key=encryption_key, nonce=nonce) 88 cipher.update(header) 89 plaintext = cipher.decrypt_and_verify(ciphertext, tag) 90 return plaintext 91 92 return (encrypt, decrypt)
Uses XChaCha20_Poly1305 from the pycryptodome library to perform encryption, while ignoring zarr metadata files.
https://pycryptodome.readthedocs.io/en/latest/src/cipher/chacha20_poly1305.html
Note that the encryption key must always be 32 bytes long. A header is required by the underlying encryption algorithm. Every time a zarr chunk is encrypted, a random 24-byte nonce is generated. This is saved with the chunk for use when reading back.
zarr.json metadata files in a zarr v3 are always ignored, to allow for calculating an encrypted zarr's structure without having the encryption key.
With exclude_vars
you may also set some variables to be unencrypted. This allows for partially encrypted zarrs which can be loaded into xarray but the values of encrypted variables cannot be accessed (errors will be thrown). You should generally include your coordinate variables along with your data variables in here.
Example code
from py_hamt import HAMT, IPFSStore, IPFSZarr3
ds = ... # example xarray Dataset with precip and temp data variables
encryption_key = bytes(32) # change before using, only for demonstration purposes!
header = "sample-header".encode()
encrypt, decrypt = create_zarr_encryption_transformers(
encryption_key, header, exclude_vars=["temp"]
)
hamt = HAMT(
store=IPFSStore(), transformer_encode=encrypt, transformer_decode=decrypt
)
ipfszarr3 = IPFSZarr3(hamt)
ds.to_zarr(store=ipfszarr3, mode="w")
print("Attempting to read and print metadata of partially encrypted zarr")
enc_ds = xr.open_zarr(store=ipfszarr3, read_only=True)
print(enc_ds)
assert enc_ds.temp.sum() == ds.temp.sum()
try:
enc_ds.precip.sum()
except:
print("Couldn't read encrypted variable")
10class IPFSZarr3(zarr.abc.store.Store): 11 """ 12 While Zarr v2 can use a generic key-value map (MutableMapping) that HAMT already conforms to, Zarr v3s require storage classes to conform to a new abstract class. IPFSZarr3 just wraps over a HAMT to provide this compatibility. 13 14 An example of how to write and read a zarr, using xarray, is provided below. 15 # Write and get CID 16 ```python 17 import xarray as xr 18 from py_hamt import IPFSStore, HAMT, IPFSZarr3 19 20 ds = ... # some xarray Dataset 21 ipfszarr3 = IPFSZarr3(HAMT(store=IPFSStore())) 22 xr.to_zarr(store=ipfszarr3) 23 print(ipfszarr3.hamt.root_node_id) # The CID of the root, which is used for reading 24 ``` 25 26 # Read from CID 27 ```python 28 import xarray as xr 29 from multiformats import CID 30 from py_hamt import IPFSStore, HAMT, IPFSZarr3 31 32 cid = CID.decode("...") # the CID for the HAMT root 33 ipfszarr3 = IPFSZarr3(HAMT(store=IPFSStore(), root_node_id=cid), read_only=True) 34 ds = xr.open_zarr(store=ipfszarr3) 35 print(ds) 36 ``` 37 """ 38 39 hamt: HAMT 40 """The internal HAMT. Safe to read the CID from, if done doing operations.""" 41 42 def __init__(self, hamt: HAMT, read_only: bool = False) -> None: 43 super().__init__(read_only=read_only) 44 self.hamt = hamt 45 if read_only: 46 self.hamt.make_read_only() 47 else: 48 self.hamt.enable_write() 49 50 @property 51 def read_only(self) -> bool: 52 return self.hamt.read_only 53 54 def __eq__(self, val: object) -> bool: 55 if not isinstance(val, IPFSZarr3): 56 return False 57 return self.hamt.root_node_id == val.hamt.root_node_id 58 59 async def get( 60 self, 61 key: str, 62 prototype: zarr.core.buffer.BufferPrototype, 63 byte_range: zarr.abc.store.ByteRequest | None = None, 64 ) -> zarr.core.buffer.Buffer | None: 65 if key not in self.hamt: 66 return 67 # We know this value will always be bytes since we only store bytes in the HAMT 68 val: bytes = self.hamt[key] # type: ignore 69 return prototype.buffer.from_bytes(val) 70 71 # Hypothetical code for supporting partial writes, but there is not much point since IPFS itself doesn't support partial write and reads 72 # Untested! If for some reason this is being uncommented and then used in the future, this needs to be tested 73 # subset: bytes 74 # match byte_range: 75 # case None: 76 # subset = val 77 # case zarr.abc.store.RangeByteRequest: 78 # subset = val[byte_range.start : byte_range.end] 79 # case zarr.abc.store.OffsetByteRequest: 80 # subset = val[byte_range.offset :] 81 # case zarr.abc.store.SuffixByteRequest: 82 # subset = val[-byte_range.suffix :] 83 84 async def get_partial_values( 85 self, 86 prototype: zarr.core.buffer.BufferPrototype, 87 key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], 88 ) -> list[zarr.core.buffer.Buffer | None]: 89 raise NotImplementedError 90 91 async def exists(self, key: str) -> bool: 92 return key in self.hamt 93 94 @property 95 def supports_writes(self) -> bool: 96 return not self.hamt.read_only 97 98 @property 99 def supports_partial_writes(self) -> bool: 100 return False 101 102 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 103 self.hamt[key] = value.to_bytes() 104 105 async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None: 106 if key not in self.hamt: 107 await self.set(key, value) 108 109 async def set_partial_values( 110 self, key_start_values: Iterable[tuple[str, int, BytesLike]] 111 ) -> None: 112 raise NotImplementedError 113 114 @property 115 def supports_deletes(self) -> bool: 116 return not self.hamt.read_only 117 118 async def delete(self, key: str) -> None: 119 try: 120 del self.hamt[key] 121 # It's fine if the key was not in the HAMT 122 # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason 123 except KeyError: 124 return 125 126 @property 127 def supports_listing(self) -> bool: 128 return True 129 130 async def list(self) -> AsyncIterator[str]: 131 for key in self.hamt: 132 yield key 133 134 async def list_prefix(self, prefix: str) -> AsyncIterator: 135 for key in self.hamt: 136 if key.startswith(prefix): 137 yield key 138 139 async def list_dir(self, prefix: str) -> AsyncIterator: 140 for key in self.hamt: 141 if key.startswith(prefix): 142 suffix = key[len(prefix) :] 143 first_slash = suffix.find("/") 144 if first_slash == -1: 145 yield suffix 146 else: 147 name = suffix[0:first_slash] 148 yield name
While Zarr v2 can use a generic key-value map (MutableMapping) that HAMT already conforms to, Zarr v3s require storage classes to conform to a new abstract class. IPFSZarr3 just wraps over a HAMT to provide this compatibility.
An example of how to write and read a zarr, using xarray, is provided below.
Write and get CID
import xarray as xr
from py_hamt import IPFSStore, HAMT, IPFSZarr3
ds = ... # some xarray Dataset
ipfszarr3 = IPFSZarr3(HAMT(store=IPFSStore()))
xr.to_zarr(store=ipfszarr3)
print(ipfszarr3.hamt.root_node_id) # The CID of the root, which is used for reading
Read from CID
import xarray as xr
from multiformats import CID
from py_hamt import IPFSStore, HAMT, IPFSZarr3
cid = CID.decode("...") # the CID for the HAMT root
ipfszarr3 = IPFSZarr3(HAMT(store=IPFSStore(), root_node_id=cid), read_only=True)
ds = xr.open_zarr(store=ipfszarr3)
print(ds)
59 async def get( 60 self, 61 key: str, 62 prototype: zarr.core.buffer.BufferPrototype, 63 byte_range: zarr.abc.store.ByteRequest | None = None, 64 ) -> zarr.core.buffer.Buffer | None: 65 if key not in self.hamt: 66 return 67 # We know this value will always be bytes since we only store bytes in the HAMT 68 val: bytes = self.hamt[key] # type: ignore 69 return prototype.buffer.from_bytes(val) 70 71 # Hypothetical code for supporting partial writes, but there is not much point since IPFS itself doesn't support partial write and reads 72 # Untested! If for some reason this is being uncommented and then used in the future, this needs to be tested 73 # subset: bytes 74 # match byte_range: 75 # case None: 76 # subset = val 77 # case zarr.abc.store.RangeByteRequest: 78 # subset = val[byte_range.start : byte_range.end] 79 # case zarr.abc.store.OffsetByteRequest: 80 # subset = val[byte_range.offset :] 81 # case zarr.abc.store.SuffixByteRequest: 82 # subset = val[-byte_range.suffix :]
Retrieve the value associated with a given key.
Parameters
key : str prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. byte_range : ByteRequest, optional ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.
Returns
Buffer
84 async def get_partial_values( 85 self, 86 prototype: zarr.core.buffer.BufferPrototype, 87 key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]], 88 ) -> list[zarr.core.buffer.Buffer | None]: 89 raise NotImplementedError
Retrieve possibly partial values from given key_ranges.
Parameters
prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]] Ordered set of key, range pairs, a key may occur multiple times with different ranges
Returns
list of values, in the order of the key_ranges, may contain null/none for missing keys
Check if a key exists in the store.
Parameters
key : str
Returns
bool
102 async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None: 103 self.hamt[key] = value.to_bytes()
Store a (key, value) pair.
Parameters
key : str value : Buffer
105 async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None: 106 if key not in self.hamt: 107 await self.set(key, value)
Store a key to value
if the key is not already present.
Parameters
key : str value : Buffer
109 async def set_partial_values( 110 self, key_start_values: Iterable[tuple[str, int, BytesLike]] 111 ) -> None: 112 raise NotImplementedError
Store values at a given key, starting at byte range_start.
Parameters
key_start_values : list[tuple[str, int, BytesLike]] set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key
118 async def delete(self, key: str) -> None: 119 try: 120 del self.hamt[key] 121 # It's fine if the key was not in the HAMT 122 # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason 123 except KeyError: 124 return
Remove a key from the store
Parameters
key : str
Retrieve all keys in the store.
Returns
AsyncIterator[str]
134 async def list_prefix(self, prefix: str) -> AsyncIterator: 135 for key in self.hamt: 136 if key.startswith(prefix): 137 yield key
Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.
Parameters
prefix : str
Returns
AsyncIterator[str]
139 async def list_dir(self, prefix: str) -> AsyncIterator: 140 for key in self.hamt: 141 if key.startswith(prefix): 142 suffix = key[len(prefix) :] 143 first_slash = suffix.find("/") 144 if first_slash == -1: 145 yield suffix 146 else: 147 name = suffix[0:first_slash] 148 yield name
Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.
Parameters
prefix : str
Returns
AsyncIterator[str]