py_hamt

 1from .hamt import HAMT, blake3_hashfn
 2from .store import Store, DictStore, IPFSStore
 3from .zarr_encryption_transformers import create_zarr_encryption_transformers
 4from .ipfszarr3 import IPFSZarr3
 5
 6__all__ = [
 7    "HAMT",
 8    "blake3_hashfn",
 9    "Store",
10    "DictStore",
11    "IPFSStore",
12    "create_zarr_encryption_transformers",
13    "IPFSZarr3",
14]
class HAMT(collections.abc.MutableMapping):
128class HAMT(MutableMapping):
129    """
130    This HAMT presents a key value interface, like a python dictionary. The only limits are that keys can only be strings, and values can only be types amenable with [IPLDKind](https://dag-cbor.readthedocs.io/en/stable/api/dag_cbor.ipld.html#dag_cbor.ipld.IPLDKind). IPLDKind is a fairly flexible data model, but do note that integers are must be within the bounds of a signed 64-bit integer.
131
132    py-hamt uses blake3 with a 32 byte wide hash by default, but to bring your own, read more in the documentation of `blake3_hashfn`.
133
134    # Some notes about thread safety
135
136    Since modifying a HAMT changes all parts of the tree, due to reserializing and saving to the backing store, modificiations are not thread safe. Thus, we offer a read-only mode which allows for parallel accesses, or a thread safe write enabled mode. You can see what type it is from the `read_only` variable. HAMTs default to write enabled mode on creation. Calling mutating operations in read only mode will raise Exceptions.
137
138    In write enabled mode, all operations block and wait, so multiple threads can write to the HAMT but the operations will in reality happen one after the other.
139
140    # transformer_encode, transformer_decode
141    `transformer_encode` and `transformer_decode` are functions that are called right before sending raw data to the store, after being encoded with dag_cbor. They get access to the key that the value originally corresponded to.
142
143    HAMT also sometimes sends its internal data structure objects to the store, those will not pass through these functions.
144
145    # dunder method documentation
146    These are not generated by pdoc automatically so we are including their documentation here.
147
148    ## `__len__`
149    Total number of keys. Note that this will have to scan the entire tree in order to count, which can take a while depending on the speed of the store retrieval.
150
151    ## `__iter__`
152    Generator of all string keys. When initially called, this will freeze the root node and then start iteration, so subsequent sets and deletes that mutate will not be reflected in the keys returned in this iteration.
153
154    ## `__deepcopy__`
155    For use with the python copy module. This creates deep copies of the current HAMT. The only thing it does not copy over is the cache.
156    ```python
157    from copy import deepcopy
158    hamt = # ...
159    copy_hamt = deepcopy(hamt)
160    ```
161    """
162
163    store: Store
164    """@private"""
165    # Every call of this will look like self.hash_fn() and so the first argument will always take a self argument
166    hash_fn: Callable[[bytes], bytes]
167    """@private"""
168    # Only important for writing, when reading this will use buckets even if they are overly big
169    max_bucket_size: int
170    """@private"""
171
172    # Don't use the type alias here since this is exposed in the documentation
173    root_node_id: IPLDKind
174    """DO NOT modify this directly.
175
176    This is the ID that the store returns for the root node of the HAMT. This is exposed since it is sometimes useful to reconstruct other HAMTs later if using a persistent store.
177
178    This is really type IPLDKind, but the documentation generates this strange type instead since IPLDKind is a type union.
179    """
180
181    lock: Lock
182    """
183    @private
184    For use in multithreading
185    """
186
187    read_only: bool
188    """
189    DO NOT modify this directly. This is here for you to read and check.
190
191    You can modify the read status of a HAMT through the `make_read_only` or `enable_write` functions, so that the HAMT will block on making a change until all mutating operations are done.
192    """
193
194    transformer_encode: None | Callable[[str, bytes], bytes]
195    """This function is called to transform the raw bytes of a value after it is encoded with dag_cbor but before it gets sent to the Store."""
196    transformer_decode: None | Callable[[str, bytes], bytes]
197    """This function is called to transform the raw bytes of a value after it is retrieved from the store but before decoding with dag_cbor to python IPLDKind type objects."""
198
199    cache: dict[StoreID, Node]
200    """@private"""
201    max_cache_size_bytes: int
202    """The maximum size of the internal cache of nodes. We expect that some Stores will be high latency, so it is useful to cache parts of the datastructure.
203
204    Set to 0 if you want no cache at all."""
205
206    # We rely on python 3.7+'s dicts which keep insertion order of elements to do a basic LRU
207    def cache_eviction_lru(self):
208        while getsizeof(self.cache) > self.max_cache_size_bytes:
209            if len(self.cache) == 0:
210                return
211            stalest_node_id = next(iter(self.cache.keys()))
212            del self.cache[stalest_node_id]
213
214    def write_node(self, node: Node) -> IPLDKind:
215        """@private"""
216        node_id = self.store.save_dag_cbor(node.serialize())
217        self.cache[node_id] = node
218        self.cache_eviction_lru()
219        return node_id
220
221    def read_node(self, node_id: IPLDKind) -> Node:
222        """@private"""
223        node: Node
224        # cache hit
225        if node_id in self.cache:
226            node = self.cache[node_id]
227            # Reinsert to put this key as the most recently used and last in the dict insertion order
228            del self.cache[node_id]
229            self.cache[node_id] = node
230            # cache miss
231        else:
232            node = Node.deserialize(self.store.load(node_id))
233            self.cache[node_id] = node
234            self.cache_eviction_lru()
235
236        return node
237
238    def __init__(
239        self,
240        store: Store,
241        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
242        read_only: bool = False,
243        root_node_id: IPLDKind = None,
244        max_cache_size_bytes=10_000_000,  # default to 10 megabytes
245        transformer_encode: None | Callable[[str, bytes], bytes] = None,
246        transformer_decode: None | Callable[[str, bytes], bytes] = None,
247    ):
248        self.store = store
249        self.hash_fn = hash_fn
250
251        self.cache = {}
252        self.max_cache_size_bytes = max_cache_size_bytes
253
254        self.max_bucket_size = 4
255        self.read_only = read_only
256        self.lock = Lock()
257
258        self.transformer_encode = transformer_encode
259        self.transformer_decode = transformer_decode
260
261        if root_node_id is None:
262            root_node = Node()
263            self.root_node_id = self.write_node(root_node)
264        else:
265            self.root_node_id = root_node_id
266            # Make sure the cache has our root node
267            self.read_node(self.root_node_id)
268
269    # dunder for the python deepcopy module
270    def __deepcopy__(self, memo) -> "HAMT":
271        if not self.read_only:
272            self.lock.acquire(blocking=True)
273
274        copy_hamt = HAMT(
275            store=self.store,
276            hash_fn=self.hash_fn,
277            read_only=self.read_only,
278            root_node_id=self.root_node_id,
279            transformer_encode=self.transformer_encode,
280            transformer_decode=self.transformer_decode,
281        )
282
283        if not self.read_only:
284            self.lock.release()
285
286        return copy_hamt
287
288    def make_read_only(self):
289        """Disables all mutation of this HAMT. When enabled, the HAMT will throw errors if set or delete are called. When a HAMT is only in read only mode, it allows for safe multithreaded reads, increasing performance."""
290        self.lock.acquire(blocking=True)
291        self.read_only = True
292        self.lock.release()
293
294    def enable_write(self):
295        self.lock.acquire(blocking=True)
296        self.read_only = False
297        self.lock.release()
298
299    def _reserialize_and_link(self, node_stack: list[tuple[Link, Node]]):
300        """
301        For internal use
302        This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store
303        Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack
304        Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python
305        If a node ends up being empty, then it is deleted entirely, unless it is the root node
306        Modifies in place
307        """
308        # Iterate in the reverse direction, imitating going deeper into a stack
309        for stack_index in range(len(node_stack) - 1, -1, -1):
310            old_store_id, node = node_stack[stack_index]
311
312            # If this node is empty, and its not the root node, then we can delete it entirely from the list
313            buckets = node.get_buckets()
314            links = node.get_links()
315            is_empty = buckets == {} and links == {}
316            is_not_root = stack_index > 0
317
318            if is_empty and is_not_root:
319                # Unlink from the rest of the tree
320                _, prev_node = node_stack[stack_index - 1]
321                prev_node._remove_link(old_store_id)
322
323                # Remove from our stack
324                node_stack.pop(stack_index)
325            else:
326                # Reserialize
327                new_store_id = self.write_node(node)
328                node_stack[stack_index] = (new_store_id, node)
329
330                # If this is not the last i.e. root node, we need to change the linking of the node prior in the list
331                if stack_index > 0:
332                    _, prev_node = node_stack[stack_index - 1]
333                    prev_node._replace_link(old_store_id, new_store_id)
334
335    def __setitem__(self, key_to_insert: str, val_to_insert: IPLDKind):
336        if self.read_only:
337            raise Exception("Cannot call set on a read only HAMT")
338
339        if not self.read_only:
340            self.lock.acquire(blocking=True)
341
342        val = dag_cbor.encode(val_to_insert)
343        if self.transformer_encode is not None:
344            val = self.transformer_encode(key_to_insert, val)
345        val_ptr = self.store.save_raw(val)
346
347        node_stack: list[tuple[Link, Node]] = []
348        root_node = self.read_node(self.root_node_id)
349        node_stack.append((self.root_node_id, root_node))
350
351        # FIFO queue to keep track of all the KVs we need to insert
352        # This is important for if any buckets overflow and thus we need to reinsert all those KVs
353        kvs_queue: list[tuple[str, IPLDKind]] = []
354        kvs_queue.append((key_to_insert, val_ptr))
355
356        created_change = False
357        # Keep iterating until we have no more KVs to insert
358        while len(kvs_queue) != 0:
359            _, top_node = node_stack[-1]
360            curr_key, curr_val = kvs_queue[0]
361
362            raw_hash = self.hash_fn(curr_key.encode())
363            map_key = str(extract_bits(raw_hash, len(node_stack) - 1, 8))
364
365            buckets = top_node.get_buckets()
366            links = top_node.get_links()
367
368            if map_key in links and map_key in buckets:
369                self.lock.release()
370                raise Exception(
371                    "Key in both buckets and links of the node, invariant violated"
372                )
373            elif map_key in links:
374                next_node_id = links[map_key]
375                next_node = self.read_node(next_node_id)
376                node_stack.append((next_node_id, next_node))
377            elif map_key in buckets:
378                bucket = buckets[map_key]
379                created_change = True
380
381                # If this bucket already has this same key, just rewrite the value
382                # Since we can't use continues to go back to the top of the while loop, use this boolean flag instead
383                should_continue_at_while = False
384                for kv in bucket:
385                    if curr_key in kv:
386                        kv[curr_key] = curr_val
387                        kvs_queue.pop(0)
388                        should_continue_at_while = True
389                        break
390                if should_continue_at_while:
391                    continue
392
393                bucket_has_space = len(bucket) < self.max_bucket_size
394                if bucket_has_space:
395                    bucket.append({curr_key: curr_val})
396                    kvs_queue.pop(0)
397                # If bucket is full and we need to add, then all these KVs need to be taken out of this bucket and reinserted throughout the tree
398                else:
399                    # Empty the bucket of KVs into the queue
400                    for kv in bucket:
401                        for k, v in kv.items():
402                            kvs_queue.append((k, v))
403
404                    # Delete empty bucket, there should only be a link now
405                    del buckets[map_key]
406
407                    # Create a new link to a new node so that we can reflow these KVs into new buckets
408                    new_node = Node()
409                    new_node_id = self.write_node(new_node)
410
411                    links[map_key] = new_node_id
412
413                    # We need to rerun from the top with the new queue, but this time this node will have a link to put KVs deeper down in the tree
414                    node_stack.append((new_node_id, new_node))
415            else:
416                # If there is no link and no bucket, then we can create a new bucket, insert, and be done with this key
417                bucket: list[dict[str, IPLDKind]] = []
418                bucket.append({curr_key: curr_val})
419                kvs_queue.pop(0)
420                buckets[map_key] = bucket
421                created_change = True
422
423        # Finally, reserialize and fix all links, deleting empty nodes as needed
424        if created_change:
425            self._reserialize_and_link(node_stack)
426            node_stack_top_id = node_stack[0][0]
427            self.root_node_id = node_stack_top_id
428
429        if not self.read_only:
430            self.lock.release()
431
432    def __delitem__(self, key: str):
433        if self.read_only:
434            raise Exception("Cannot call delete on a read only HAMT")
435
436        if not self.read_only:
437            self.lock.acquire(blocking=True)
438
439        raw_hash = self.hash_fn(key.encode())
440
441        node_stack: list[tuple[Link, Node]] = []
442        root_node = self.read_node(self.root_node_id)
443        node_stack.append((self.root_node_id, root_node))
444
445        created_change = False
446        while True:
447            top_id, top_node = node_stack[-1]
448            map_key = str(extract_bits(raw_hash, len(node_stack) - 1, 8))
449
450            buckets = top_node.get_buckets()
451            links = top_node.get_links()
452
453            if map_key in buckets and map_key in links:
454                self.lock.release()
455                raise Exception(
456                    "Key in both buckets and links of the node, invariant violated"
457                )
458            elif map_key in buckets:
459                bucket = buckets[map_key]
460
461                # Delete from within this bucket
462                for bucket_index in range(len(bucket)):
463                    kv = bucket[bucket_index]
464                    if key in kv:
465                        created_change = True
466                        bucket.pop(bucket_index)
467
468                        # If this bucket becomes empty then delete this dict entry for the bucket
469                        if len(bucket) == 0:
470                            del buckets[map_key]
471
472                        # This must be done to avoid IndexErrors after continuing to iterate since the length of the bucket has now changed
473                        break
474
475                break
476            elif map_key in links:
477                link = links[map_key]
478                next_node = self.read_node(link)
479                node_stack.append((link, next_node))
480                continue
481
482            else:
483                # This key is not even in the HAMT so just exit
484                break
485
486        # Finally, reserialize and fix all links, deleting empty nodes as needed
487        if created_change:
488            self._reserialize_and_link(node_stack)
489            node_stack_top_id = node_stack[0][0]
490            self.root_node_id = node_stack_top_id
491
492        if not self.read_only:
493            self.lock.release()
494
495        if not created_change:
496            raise KeyError
497
498    def __getitem__(self, key: str) -> IPLDKind:
499        if not self.read_only:
500            self.lock.acquire(blocking=True)
501
502        raw_hash = self.hash_fn(key.encode())
503
504        node_id_stack: list[Link] = []
505        node_id_stack.append(self.root_node_id)
506
507        # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value the HAMT can store
508        result_ptr: IPLDKind = None
509        found_a_result: bool = False
510        while True:
511            top_id = node_id_stack[-1]
512            top_node = self.read_node(top_id)
513            map_key = str(extract_bits(raw_hash, len(node_id_stack) - 1, 8))
514
515            # Check if this node is in one of the buckets
516            buckets = top_node.get_buckets()
517            if map_key in buckets:
518                bucket = buckets[map_key]
519                for kv in bucket:
520                    if key in kv:
521                        result_ptr = kv[key]
522                        found_a_result = True
523                        break
524
525            # If it isn't in one of the buckets, check if there's a link to another serialized node id
526            links = top_node.get_links()
527            if map_key in links:
528                link_id = links[map_key]
529                node_id_stack.append(link_id)
530                continue
531            # Nowhere left to go, stop walking down the tree
532            else:
533                break
534
535        if not self.read_only:
536            self.lock.release()
537
538        if not found_a_result:
539            raise KeyError
540
541        result_bytes = self.store.load(result_ptr)
542        if self.transformer_decode is not None:
543            result_bytes = self.transformer_decode(key, result_bytes)
544        return dag_cbor.decode(result_bytes)
545
546    def __len__(self) -> int:
547        key_count = 0
548        for _ in self:
549            key_count += 1
550
551        return key_count
552
553    def __iter__(self):
554        if not self.read_only:
555            self.lock.acquire(blocking=True)
556
557        node_id_stack = []
558        node_id_stack.append(self.root_node_id)
559
560        if not self.read_only:
561            self.lock.release()
562
563        while True:
564            if len(node_id_stack) == 0:
565                break
566
567            top_id = node_id_stack.pop()
568            node = self.read_node(top_id)
569
570            # Collect all keys from all buckets
571            buckets = node.get_buckets()
572            for bucket in buckets.values():
573                for kv in bucket:
574                    for k in kv:
575                        yield k
576
577            # Traverse down list of links
578            links = node.get_links()
579            for link in links.values():
580                node_id_stack.append(link)
581
582    def ids(self):
583        """Generator of all IDs the backing store uses"""
584        if not self.read_only:
585            self.lock.acquire(blocking=True)
586
587        node_id_stack: list[Link] = []
588        node_id_stack.append(self.root_node_id)
589
590        if not self.read_only:
591            self.lock.release()
592
593        while len(node_id_stack) > 0:
594            top_id = node_id_stack.pop()
595            yield top_id
596            top_node = self.read_node(top_id)
597
598            buckets = top_node.get_buckets()
599            for bucket in buckets.values():
600                for kv in bucket:
601                    for v in kv.values():
602                        yield v
603
604            # Traverse down list of ids that are the store's links'
605            links = top_node.get_links()
606            for link in links.values():
607                node_id_stack.append(link)

This HAMT presents a key value interface, like a python dictionary. The only limits are that keys can only be strings, and values can only be types amenable with IPLDKind. IPLDKind is a fairly flexible data model, but do note that integers are must be within the bounds of a signed 64-bit integer.

py-hamt uses blake3 with a 32 byte wide hash by default, but to bring your own, read more in the documentation of blake3_hashfn.

Some notes about thread safety

Since modifying a HAMT changes all parts of the tree, due to reserializing and saving to the backing store, modificiations are not thread safe. Thus, we offer a read-only mode which allows for parallel accesses, or a thread safe write enabled mode. You can see what type it is from the read_only variable. HAMTs default to write enabled mode on creation. Calling mutating operations in read only mode will raise Exceptions.

In write enabled mode, all operations block and wait, so multiple threads can write to the HAMT but the operations will in reality happen one after the other.

transformer_encode, transformer_decode

transformer_encode and transformer_decode are functions that are called right before sending raw data to the store, after being encoded with dag_cbor. They get access to the key that the value originally corresponded to.

HAMT also sometimes sends its internal data structure objects to the store, those will not pass through these functions.

dunder method documentation

These are not generated by pdoc automatically so we are including their documentation here.

__len__

Total number of keys. Note that this will have to scan the entire tree in order to count, which can take a while depending on the speed of the store retrieval.

__iter__

Generator of all string keys. When initially called, this will freeze the root node and then start iteration, so subsequent sets and deletes that mutate will not be reflected in the keys returned in this iteration.

__deepcopy__

For use with the python copy module. This creates deep copies of the current HAMT. The only thing it does not copy over is the cache.

from copy import deepcopy
hamt = # ...
copy_hamt = deepcopy(hamt)
HAMT( store: Store, hash_fn: Callable[[bytes], bytes] = <function blake3_hashfn>, read_only: bool = False, root_node_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]] = None, max_cache_size_bytes=10000000, transformer_encode: Optional[Callable[[str, bytes], bytes]] = None, transformer_decode: Optional[Callable[[str, bytes], bytes]] = None)
238    def __init__(
239        self,
240        store: Store,
241        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
242        read_only: bool = False,
243        root_node_id: IPLDKind = None,
244        max_cache_size_bytes=10_000_000,  # default to 10 megabytes
245        transformer_encode: None | Callable[[str, bytes], bytes] = None,
246        transformer_decode: None | Callable[[str, bytes], bytes] = None,
247    ):
248        self.store = store
249        self.hash_fn = hash_fn
250
251        self.cache = {}
252        self.max_cache_size_bytes = max_cache_size_bytes
253
254        self.max_bucket_size = 4
255        self.read_only = read_only
256        self.lock = Lock()
257
258        self.transformer_encode = transformer_encode
259        self.transformer_decode = transformer_decode
260
261        if root_node_id is None:
262            root_node = Node()
263            self.root_node_id = self.write_node(root_node)
264        else:
265            self.root_node_id = root_node_id
266            # Make sure the cache has our root node
267            self.read_node(self.root_node_id)
root_node_id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]

DO NOT modify this directly.

This is the ID that the store returns for the root node of the HAMT. This is exposed since it is sometimes useful to reconstruct other HAMTs later if using a persistent store.

This is really type IPLDKind, but the documentation generates this strange type instead since IPLDKind is a type union.

read_only: bool

DO NOT modify this directly. This is here for you to read and check.

You can modify the read status of a HAMT through the make_read_only or enable_write functions, so that the HAMT will block on making a change until all mutating operations are done.

transformer_encode: Optional[Callable[[str, bytes], bytes]]

This function is called to transform the raw bytes of a value after it is encoded with dag_cbor but before it gets sent to the Store.

transformer_decode: Optional[Callable[[str, bytes], bytes]]

This function is called to transform the raw bytes of a value after it is retrieved from the store but before decoding with dag_cbor to python IPLDKind type objects.

max_cache_size_bytes: int

The maximum size of the internal cache of nodes. We expect that some Stores will be high latency, so it is useful to cache parts of the datastructure.

Set to 0 if you want no cache at all.

def cache_eviction_lru(self):
207    def cache_eviction_lru(self):
208        while getsizeof(self.cache) > self.max_cache_size_bytes:
209            if len(self.cache) == 0:
210                return
211            stalest_node_id = next(iter(self.cache.keys()))
212            del self.cache[stalest_node_id]
def make_read_only(self):
288    def make_read_only(self):
289        """Disables all mutation of this HAMT. When enabled, the HAMT will throw errors if set or delete are called. When a HAMT is only in read only mode, it allows for safe multithreaded reads, increasing performance."""
290        self.lock.acquire(blocking=True)
291        self.read_only = True
292        self.lock.release()

Disables all mutation of this HAMT. When enabled, the HAMT will throw errors if set or delete are called. When a HAMT is only in read only mode, it allows for safe multithreaded reads, increasing performance.

def enable_write(self):
294    def enable_write(self):
295        self.lock.acquire(blocking=True)
296        self.read_only = False
297        self.lock.release()
def ids(self):
582    def ids(self):
583        """Generator of all IDs the backing store uses"""
584        if not self.read_only:
585            self.lock.acquire(blocking=True)
586
587        node_id_stack: list[Link] = []
588        node_id_stack.append(self.root_node_id)
589
590        if not self.read_only:
591            self.lock.release()
592
593        while len(node_id_stack) > 0:
594            top_id = node_id_stack.pop()
595            yield top_id
596            top_node = self.read_node(top_id)
597
598            buckets = top_node.get_buckets()
599            for bucket in buckets.values():
600                for kv in bucket:
601                    for v in kv.values():
602                        yield v
603
604            # Traverse down list of ids that are the store's links'
605            links = top_node.get_links()
606            for link in links.values():
607                node_id_stack.append(link)

Generator of all IDs the backing store uses

def blake3_hashfn(input_bytes: bytes) -> bytes:
114def blake3_hashfn(input_bytes: bytes) -> bytes:
115    """
116    This is provided as a default recommended hash function by py-hamt. It uses the blake3 hash function and uses 32 bytes as the hash size.
117
118    To bring your own hash function, just create a function that takes in bytes and returns the hash bytes, and use that in the HAMT init method.
119
120    It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.
121    """
122    # 32 bytes is the recommended byte size for blake3 and the default, but multihash forces us to explicitly specify
123    digest = b3.digest(input_bytes, size=32)
124    raw_bytes = b3.unwrap(digest)
125    return raw_bytes

This is provided as a default recommended hash function by py-hamt. It uses the blake3 hash function and uses 32 bytes as the hash size.

To bring your own hash function, just create a function that takes in bytes and returns the hash bytes, and use that in the HAMT init method.

It's important to note that the resulting hash must must always be a multiple of 8 bits since python bytes object can only represent in segments of bytes, and thus 8 bits.

class Store(abc.ABC):
11class Store(ABC):
12    """Abstract class that represents a storage mechanism the HAMT can use for keeping data.
13
14    The return type of save and input to load is really type IPLDKind, but the documentation generates something a bit strange since IPLDKind is a type union.
15    """
16
17    @abstractmethod
18    def save_raw(self, data: bytes) -> IPLDKind:
19        """Take any set of bytes, save it to the storage mechanism, and return an ID in the type of IPLDKind that can be used to retrieve those bytes later."""
20
21    @abstractmethod
22    def save_dag_cbor(self, data: bytes) -> IPLDKind:
23        """Take a set of bytes and save it just like `save_raw`, except this method has additional context that the data is in a dag-cbor format."""
24
25    @abstractmethod
26    def load(self, id: IPLDKind) -> bytes:
27        """Retrieve the bytes based on an ID returned earlier by the save function."""

Abstract class that represents a storage mechanism the HAMT can use for keeping data.

The return type of save and input to load is really type IPLDKind, but the documentation generates something a bit strange since IPLDKind is a type union.

@abstractmethod
def save_raw( self, data: bytes) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
17    @abstractmethod
18    def save_raw(self, data: bytes) -> IPLDKind:
19        """Take any set of bytes, save it to the storage mechanism, and return an ID in the type of IPLDKind that can be used to retrieve those bytes later."""

Take any set of bytes, save it to the storage mechanism, and return an ID in the type of IPLDKind that can be used to retrieve those bytes later.

@abstractmethod
def save_dag_cbor( self, data: bytes) -> Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]:
21    @abstractmethod
22    def save_dag_cbor(self, data: bytes) -> IPLDKind:
23        """Take a set of bytes and save it just like `save_raw`, except this method has additional context that the data is in a dag-cbor format."""

Take a set of bytes and save it just like save_raw, except this method has additional context that the data is in a dag-cbor format.

@abstractmethod
def load( self, id: Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]], Dict[str, Union[NoneType, bool, int, float, str, bytes, multiformats.cid.CID, List[ForwardRef('IPLDKind')], Dict[str, ForwardRef('IPLDKind')]]]]) -> bytes:
25    @abstractmethod
26    def load(self, id: IPLDKind) -> bytes:
27        """Retrieve the bytes based on an ID returned earlier by the save function."""

Retrieve the bytes based on an ID returned earlier by the save function.

class DictStore(py_hamt.Store):
31class DictStore(Store):
32    """A basic implementation of a backing store, mostly for demonstration and testing purposes. It hashes all inputs and uses that as a key to an in-memory python dict. The hash bytes are the ID that `save` returns and `load` takes in."""
33
34    store: dict[bytes, bytes]
35    """@private"""
36    hash_alg: Multihash
37    """@private"""
38
39    def __init__(self):
40        self.store = {}
41        self.hash_alg = multihash.get("blake3")
42
43    def save(self, data: bytes) -> bytes:
44        hash = self.hash_alg.digest(data, size=32)
45        self.store[hash] = data
46        return hash
47
48    def save_raw(self, data: bytes) -> bytes:
49        """"""
50        return self.save(data)
51
52    def save_dag_cbor(self, data: bytes) -> bytes:
53        """"""
54        return self.save(data)
55
56    # Ignore the type error since bytes is in the IPLDKind type
57    def load(self, id: bytes) -> bytes:  # type: ignore
58        """"""
59        if id in self.store:
60            return self.store[id]
61        else:
62            raise Exception("ID not found in store")

A basic implementation of a backing store, mostly for demonstration and testing purposes. It hashes all inputs and uses that as a key to an in-memory python dict. The hash bytes are the ID that save returns and load takes in.

def save(self, data: bytes) -> bytes:
43    def save(self, data: bytes) -> bytes:
44        hash = self.hash_alg.digest(data, size=32)
45        self.store[hash] = data
46        return hash
def save_raw(self, data: bytes) -> bytes:
48    def save_raw(self, data: bytes) -> bytes:
49        """"""
50        return self.save(data)
def save_dag_cbor(self, data: bytes) -> bytes:
52    def save_dag_cbor(self, data: bytes) -> bytes:
53        """"""
54        return self.save(data)
def load(self, id: bytes) -> bytes:
57    def load(self, id: bytes) -> bytes:  # type: ignore
58        """"""
59        if id in self.store:
60            return self.store[id]
61        else:
62            raise Exception("ID not found in store")
class IPFSStore(py_hamt.Store):
 65class IPFSStore(Store):
 66    """
 67    Use IPFS as a backing store for a HAMT. The IDs returned from save and used by load are IPFS CIDs.
 68
 69    Save methods use the RPC API but `load` uses the HTTP Gateway, so read-only HAMTs will only access the HTTP Gateway. This allows for connection to remote gateways as well.
 70
 71    You can write to an authenticated IPFS node by providing credentials in the constructor. The following authentication methods are supported:
 72    - Basic Authentication: Provide a tuple of (username, password) to the `basic_auth` parameter.
 73    - Bearer Token: Provide a bearer token to the `bearer_token` parameter.
 74    - API Key: Provide an API key to the `api_key` parameter. You can customize the header name for the API key by setting the `api_key_header` parameter.
 75    """
 76
 77    def __init__(
 78        self,
 79        timeout_seconds: int = 30,
 80        gateway_uri_stem: str = "http://127.0.0.1:8080",
 81        rpc_uri_stem: str = "http://127.0.0.1:5001",
 82        hasher: str = "blake3",
 83        pin_on_add: bool = False,
 84        debug: bool = False,
 85        # Authentication parameters
 86        basic_auth: tuple[str, str] | None = None,  # (username, password)
 87        bearer_token: str | None = None,
 88        api_key: str | None = None,
 89        api_key_header: str = "X-API-Key",  # Customizable API key header
 90    ):
 91        self.timeout_seconds = timeout_seconds
 92        """
 93        You can modify this variable directly if you choose.
 94
 95        This sets the timeout in seconds for all HTTP requests.
 96        """
 97        self.gateway_uri_stem = gateway_uri_stem
 98        """
 99        URI stem of the IPFS HTTP gateway that IPFSStore will retrieve blocks from.
100        """
101        self.rpc_uri_stem = rpc_uri_stem
102        """URI Stem of the IPFS RPC API that IPFSStore will send data to."""
103        self.hasher = hasher
104        """The hash function to send to IPFS when storing bytes."""
105        self.pin_on_add: bool = pin_on_add
106        """Whether IPFSStore should tell the daemon to pin the generated CIDs in API calls. This can be changed in between usage, but should be kept the same value for the lifetime of the program."""
107        self.debug: bool = debug
108        """If true, this records the total number of bytes sent in and out of IPFSStore to the network. You can access this information in `total_sent` and `total_received`. Bytes are counted in terms of either how much was sent to IPFS to store a CID, or how much data was inside of a retrieved IPFS block. This does not include the overhead of the HTTP requests themselves."""
109        self.total_sent: None | int = 0 if debug else None
110        """Total bytes sent to IPFS. Used for debugging purposes."""
111        self.total_received: None | int = 0 if debug else None
112        """Total bytes in responses from IPFS for blocks. Used for debugging purposes."""
113
114        # Authentication settings
115        self.basic_auth = basic_auth
116        """Tuple of (username, password) for Basic Authentication"""
117        self.bearer_token = bearer_token
118        """Bearer token for token-based authentication"""
119        self.api_key = api_key
120        """API key for API key-based authentication"""
121        self.api_key_header = api_key_header
122        """Header name to use for API key authentication"""
123
124    def save(self, data: bytes, cid_codec: str) -> CID:
125        """
126        This saves the data to an ipfs daemon by calling the RPC API, and then returns the CID, with a multicodec set by the input cid_codec. We need to do this since the API always returns either a multicodec of raw or dag-pb if it had to shard the input data.
127
128        By default, `save` pins content it adds.
129
130        ```python
131        from py_hamt import IPFSStore
132
133        ipfs_store = IPFSStore()
134        cid = ipfs_store.save("foo".encode(), "raw")
135        print(cid.human_readable)
136        ```
137        """
138        pin_string: str = "true" if self.pin_on_add else "false"
139
140        # Apply authentication based on provided credentials
141        headers = {}
142        if self.bearer_token:
143            headers["Authorization"] = f"Bearer {self.bearer_token}"
144        elif self.api_key:
145            headers[self.api_key_header] = self.api_key
146
147        # Prepare request parameters
148        url = f"{self.rpc_uri_stem}/api/v0/add?hash={self.hasher}&pin={pin_string}"
149
150        # Make the request with appropriate authentication
151        response = requests.post(
152            url,
153            files={"file": data},
154            headers=headers,
155            auth=self.basic_auth,
156            timeout=self.timeout_seconds,
157        )
158        response.raise_for_status()
159
160        cid_str: str = json.decode(response.content)["Hash"]  # type: ignore
161        cid = CID.decode(cid_str)
162        # If it's dag-pb it means we should not reset the cid codec, since this is a UnixFS entry for a large amount of data that thus had to be sharded
163        # We don't worry about HAMT nodes being larger than 1 MB
164        # with a conservative calculation of 256 map keys * 10 (bucket size of 9 and 1 link per map key)*100 bytes huge size for a cid=0.256 MB, so we can always safely recodec those as dag-cbor, which is what they are
165        # 0x70 means dag-pb
166        if cid.codec.code != 0x70:
167            cid = cid.set(codec=cid_codec)
168
169        # if everything is succesful, record debug information
170        if self.debug:
171            self.total_sent += len(data)  # type: ignore
172
173        return cid
174
175    def save_raw(self, data: bytes) -> CID:
176        """See `save`"""
177        return self.save(data, "raw")
178
179    def save_dag_cbor(self, data: bytes) -> CID:
180        """See `save`"""
181        return self.save(data, "dag-cbor")
182
183    # Ignore the type error since CID is in the IPLDKind type
184    def load(self, id: CID) -> bytes:  # type: ignore
185        """
186        This retrieves the raw bytes by calling the provided HTTP gateway.
187        ```python
188        from py_hamt import IPFSStore
189        from multiformats import CID
190
191        ipfs_store = IPFSStore()
192        # This is just an example CID
193        cid = CID.decode("bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze")
194        data = ipfs_store.load(cid)
195        print(data)
196        ```
197        """
198        response = requests.get(
199            f"{self.gateway_uri_stem}/ipfs/{str(id)}", timeout=self.timeout_seconds
200        )
201        response.raise_for_status()
202
203        if self.debug:
204            self.total_received += len(response.content)  # type: ignore
205
206        return response.content

Use IPFS as a backing store for a HAMT. The IDs returned from save and used by load are IPFS CIDs.

Save methods use the RPC API but load uses the HTTP Gateway, so read-only HAMTs will only access the HTTP Gateway. This allows for connection to remote gateways as well.

You can write to an authenticated IPFS node by providing credentials in the constructor. The following authentication methods are supported:

  • Basic Authentication: Provide a tuple of (username, password) to the basic_auth parameter.
  • Bearer Token: Provide a bearer token to the bearer_token parameter.
  • API Key: Provide an API key to the api_key parameter. You can customize the header name for the API key by setting the api_key_header parameter.
IPFSStore( timeout_seconds: int = 30, gateway_uri_stem: str = 'http://127.0.0.1:8080', rpc_uri_stem: str = 'http://127.0.0.1:5001', hasher: str = 'blake3', pin_on_add: bool = False, debug: bool = False, basic_auth: tuple[str, str] | None = None, bearer_token: str | None = None, api_key: str | None = None, api_key_header: str = 'X-API-Key')
 77    def __init__(
 78        self,
 79        timeout_seconds: int = 30,
 80        gateway_uri_stem: str = "http://127.0.0.1:8080",
 81        rpc_uri_stem: str = "http://127.0.0.1:5001",
 82        hasher: str = "blake3",
 83        pin_on_add: bool = False,
 84        debug: bool = False,
 85        # Authentication parameters
 86        basic_auth: tuple[str, str] | None = None,  # (username, password)
 87        bearer_token: str | None = None,
 88        api_key: str | None = None,
 89        api_key_header: str = "X-API-Key",  # Customizable API key header
 90    ):
 91        self.timeout_seconds = timeout_seconds
 92        """
 93        You can modify this variable directly if you choose.
 94
 95        This sets the timeout in seconds for all HTTP requests.
 96        """
 97        self.gateway_uri_stem = gateway_uri_stem
 98        """
 99        URI stem of the IPFS HTTP gateway that IPFSStore will retrieve blocks from.
100        """
101        self.rpc_uri_stem = rpc_uri_stem
102        """URI Stem of the IPFS RPC API that IPFSStore will send data to."""
103        self.hasher = hasher
104        """The hash function to send to IPFS when storing bytes."""
105        self.pin_on_add: bool = pin_on_add
106        """Whether IPFSStore should tell the daemon to pin the generated CIDs in API calls. This can be changed in between usage, but should be kept the same value for the lifetime of the program."""
107        self.debug: bool = debug
108        """If true, this records the total number of bytes sent in and out of IPFSStore to the network. You can access this information in `total_sent` and `total_received`. Bytes are counted in terms of either how much was sent to IPFS to store a CID, or how much data was inside of a retrieved IPFS block. This does not include the overhead of the HTTP requests themselves."""
109        self.total_sent: None | int = 0 if debug else None
110        """Total bytes sent to IPFS. Used for debugging purposes."""
111        self.total_received: None | int = 0 if debug else None
112        """Total bytes in responses from IPFS for blocks. Used for debugging purposes."""
113
114        # Authentication settings
115        self.basic_auth = basic_auth
116        """Tuple of (username, password) for Basic Authentication"""
117        self.bearer_token = bearer_token
118        """Bearer token for token-based authentication"""
119        self.api_key = api_key
120        """API key for API key-based authentication"""
121        self.api_key_header = api_key_header
122        """Header name to use for API key authentication"""
timeout_seconds

You can modify this variable directly if you choose.

This sets the timeout in seconds for all HTTP requests.

gateway_uri_stem

URI stem of the IPFS HTTP gateway that IPFSStore will retrieve blocks from.

rpc_uri_stem

URI Stem of the IPFS RPC API that IPFSStore will send data to.

hasher

The hash function to send to IPFS when storing bytes.

pin_on_add: bool

Whether IPFSStore should tell the daemon to pin the generated CIDs in API calls. This can be changed in between usage, but should be kept the same value for the lifetime of the program.

debug: bool

If true, this records the total number of bytes sent in and out of IPFSStore to the network. You can access this information in total_sent and total_received. Bytes are counted in terms of either how much was sent to IPFS to store a CID, or how much data was inside of a retrieved IPFS block. This does not include the overhead of the HTTP requests themselves.

total_sent: None | int

Total bytes sent to IPFS. Used for debugging purposes.

total_received: None | int

Total bytes in responses from IPFS for blocks. Used for debugging purposes.

basic_auth

Tuple of (username, password) for Basic Authentication

bearer_token

Bearer token for token-based authentication

api_key

API key for API key-based authentication

api_key_header

Header name to use for API key authentication

def save(self, data: bytes, cid_codec: str) -> multiformats.cid.CID:
124    def save(self, data: bytes, cid_codec: str) -> CID:
125        """
126        This saves the data to an ipfs daemon by calling the RPC API, and then returns the CID, with a multicodec set by the input cid_codec. We need to do this since the API always returns either a multicodec of raw or dag-pb if it had to shard the input data.
127
128        By default, `save` pins content it adds.
129
130        ```python
131        from py_hamt import IPFSStore
132
133        ipfs_store = IPFSStore()
134        cid = ipfs_store.save("foo".encode(), "raw")
135        print(cid.human_readable)
136        ```
137        """
138        pin_string: str = "true" if self.pin_on_add else "false"
139
140        # Apply authentication based on provided credentials
141        headers = {}
142        if self.bearer_token:
143            headers["Authorization"] = f"Bearer {self.bearer_token}"
144        elif self.api_key:
145            headers[self.api_key_header] = self.api_key
146
147        # Prepare request parameters
148        url = f"{self.rpc_uri_stem}/api/v0/add?hash={self.hasher}&pin={pin_string}"
149
150        # Make the request with appropriate authentication
151        response = requests.post(
152            url,
153            files={"file": data},
154            headers=headers,
155            auth=self.basic_auth,
156            timeout=self.timeout_seconds,
157        )
158        response.raise_for_status()
159
160        cid_str: str = json.decode(response.content)["Hash"]  # type: ignore
161        cid = CID.decode(cid_str)
162        # If it's dag-pb it means we should not reset the cid codec, since this is a UnixFS entry for a large amount of data that thus had to be sharded
163        # We don't worry about HAMT nodes being larger than 1 MB
164        # with a conservative calculation of 256 map keys * 10 (bucket size of 9 and 1 link per map key)*100 bytes huge size for a cid=0.256 MB, so we can always safely recodec those as dag-cbor, which is what they are
165        # 0x70 means dag-pb
166        if cid.codec.code != 0x70:
167            cid = cid.set(codec=cid_codec)
168
169        # if everything is succesful, record debug information
170        if self.debug:
171            self.total_sent += len(data)  # type: ignore
172
173        return cid

This saves the data to an ipfs daemon by calling the RPC API, and then returns the CID, with a multicodec set by the input cid_codec. We need to do this since the API always returns either a multicodec of raw or dag-pb if it had to shard the input data.

By default, save pins content it adds.

from py_hamt import IPFSStore

ipfs_store = IPFSStore()
cid = ipfs_store.save("foo".encode(), "raw")
print(cid.human_readable)
def save_raw(self, data: bytes) -> multiformats.cid.CID:
175    def save_raw(self, data: bytes) -> CID:
176        """See `save`"""
177        return self.save(data, "raw")

See save

def save_dag_cbor(self, data: bytes) -> multiformats.cid.CID:
179    def save_dag_cbor(self, data: bytes) -> CID:
180        """See `save`"""
181        return self.save(data, "dag-cbor")

See save

def load(self, id: multiformats.cid.CID) -> bytes:
184    def load(self, id: CID) -> bytes:  # type: ignore
185        """
186        This retrieves the raw bytes by calling the provided HTTP gateway.
187        ```python
188        from py_hamt import IPFSStore
189        from multiformats import CID
190
191        ipfs_store = IPFSStore()
192        # This is just an example CID
193        cid = CID.decode("bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze")
194        data = ipfs_store.load(cid)
195        print(data)
196        ```
197        """
198        response = requests.get(
199            f"{self.gateway_uri_stem}/ipfs/{str(id)}", timeout=self.timeout_seconds
200        )
201        response.raise_for_status()
202
203        if self.debug:
204            self.total_received += len(response.content)  # type: ignore
205
206        return response.content

This retrieves the raw bytes by calling the provided HTTP gateway.

from py_hamt import IPFSStore
from multiformats import CID

ipfs_store = IPFSStore()
# This is just an example CID
cid = CID.decode("bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze")
data = ipfs_store.load(cid)
print(data)
def create_zarr_encryption_transformers( encryption_key: bytes, header: bytes, exclude_vars: list[str] = []) -> tuple[TransformerFN, TransformerFN]:
11def create_zarr_encryption_transformers(
12    encryption_key: bytes,
13    header: bytes,
14    exclude_vars: list[str] = [],
15) -> tuple[TransformerFN, TransformerFN]:
16    """
17    Uses XChaCha20_Poly1305 from the pycryptodome library to perform encryption, while ignoring zarr metadata files.
18
19    https://pycryptodome.readthedocs.io/en/latest/src/cipher/chacha20_poly1305.html
20
21    Note that the encryption key must always be 32 bytes long. A header is required by the underlying encryption algorithm. Every time a zarr chunk is encrypted, a random 24-byte nonce is generated. This is saved with the chunk for use when reading back.
22
23    zarr.json metadata files in a zarr v3 are always ignored, to allow for calculating an encrypted zarr's structure without having the encryption key.
24
25    With `exclude_vars` you may also set some variables to be unencrypted. This allows for partially encrypted zarrs which can be loaded into xarray but the values of encrypted variables cannot be accessed (errors will be thrown). You should generally include your coordinate variables along with your data variables in here.
26
27    # Example code
28    ```python
29    from py_hamt import HAMT, IPFSStore, IPFSZarr3
30
31    ds = ... # example xarray Dataset with precip and temp data variables
32    encryption_key = bytes(32) # change before using, only for demonstration purposes!
33    header = "sample-header".encode()
34    encrypt, decrypt = create_zarr_encryption_transformers(
35        encryption_key, header, exclude_vars=["temp"]
36    )
37    hamt = HAMT(
38        store=IPFSStore(), transformer_encode=encrypt, transformer_decode=decrypt
39    )
40    ipfszarr3 = IPFSZarr3(hamt)
41    ds.to_zarr(store=ipfszarr3, mode="w")
42
43    print("Attempting to read and print metadata of partially encrypted zarr")
44    enc_ds = xr.open_zarr(store=ipfszarr3, read_only=True)
45    print(enc_ds)
46    assert enc_ds.temp.sum() == ds.temp.sum()
47    try:
48        enc_ds.precip.sum()
49    except:
50        print("Couldn't read encrypted variable")
51    ```
52    """
53
54    if len(encryption_key) != 32:
55        raise ValueError("Encryption key is not 32 bytes")
56
57    def _should_transform(key: str) -> bool:
58        p = Path(key)
59
60        # Find the first directory name in the path since zarr v3 chunks are stored in a nested directory structure
61        # e.g. for Path("precip/c/0/0/1") it would return "precip"
62        if p.parts[0] in exclude_vars:
63            return False
64
65        # Don't transform metadata files
66        if p.name == "zarr.json":
67            return False
68
69        return True
70
71    def encrypt(key: str, val: bytes) -> bytes:
72        if not _should_transform(key):
73            return val
74
75        nonce = get_random_bytes(24)
76        cipher = ChaCha20_Poly1305.new(key=encryption_key, nonce=nonce)
77        cipher.update(header)
78        ciphertext, tag = cipher.encrypt_and_digest(val)
79        # + concatenates two byte variables x,y so that it looks like xy
80        return nonce + tag + ciphertext
81
82    def decrypt(key: str, val: bytes) -> bytes:
83        if not _should_transform(key):
84            return val
85
86        nonce, tag, ciphertext = val[:24], val[24:40], val[40:]
87        cipher = ChaCha20_Poly1305.new(key=encryption_key, nonce=nonce)
88        cipher.update(header)
89        plaintext = cipher.decrypt_and_verify(ciphertext, tag)
90        return plaintext
91
92    return (encrypt, decrypt)

Uses XChaCha20_Poly1305 from the pycryptodome library to perform encryption, while ignoring zarr metadata files.

https://pycryptodome.readthedocs.io/en/latest/src/cipher/chacha20_poly1305.html

Note that the encryption key must always be 32 bytes long. A header is required by the underlying encryption algorithm. Every time a zarr chunk is encrypted, a random 24-byte nonce is generated. This is saved with the chunk for use when reading back.

zarr.json metadata files in a zarr v3 are always ignored, to allow for calculating an encrypted zarr's structure without having the encryption key.

With exclude_vars you may also set some variables to be unencrypted. This allows for partially encrypted zarrs which can be loaded into xarray but the values of encrypted variables cannot be accessed (errors will be thrown). You should generally include your coordinate variables along with your data variables in here.

Example code

from py_hamt import HAMT, IPFSStore, IPFSZarr3

ds = ... # example xarray Dataset with precip and temp data variables
encryption_key = bytes(32) # change before using, only for demonstration purposes!
header = "sample-header".encode()
encrypt, decrypt = create_zarr_encryption_transformers(
    encryption_key, header, exclude_vars=["temp"]
)
hamt = HAMT(
    store=IPFSStore(), transformer_encode=encrypt, transformer_decode=decrypt
)
ipfszarr3 = IPFSZarr3(hamt)
ds.to_zarr(store=ipfszarr3, mode="w")

print("Attempting to read and print metadata of partially encrypted zarr")
enc_ds = xr.open_zarr(store=ipfszarr3, read_only=True)
print(enc_ds)
assert enc_ds.temp.sum() == ds.temp.sum()
try:
    enc_ds.precip.sum()
except:
    print("Couldn't read encrypted variable")
class IPFSZarr3(zarr.abc.store.Store):
 10class IPFSZarr3(zarr.abc.store.Store):
 11    """
 12    While Zarr v2 can use a generic key-value map (MutableMapping) that HAMT already conforms to, Zarr v3s require storage classes to conform to a new abstract class. IPFSZarr3 just wraps over a HAMT to provide this compatibility.
 13
 14    An example of how to write and read a zarr, using xarray, is provided below.
 15    # Write and get CID
 16    ```python
 17    import xarray as xr
 18    from py_hamt import IPFSStore, HAMT, IPFSZarr3
 19
 20    ds = ... # some xarray Dataset
 21    ipfszarr3 = IPFSZarr3(HAMT(store=IPFSStore()))
 22    xr.to_zarr(store=ipfszarr3)
 23    print(ipfszarr3.hamt.root_node_id) # The CID of the root, which is used for reading
 24    ```
 25
 26    # Read from CID
 27    ```python
 28    import xarray as xr
 29    from multiformats import CID
 30    from py_hamt import IPFSStore, HAMT, IPFSZarr3
 31
 32    cid = CID.decode("...") # the CID for the HAMT root
 33    ipfszarr3 = IPFSZarr3(HAMT(store=IPFSStore(), root_node_id=cid), read_only=True)
 34    ds = xr.open_zarr(store=ipfszarr3)
 35    print(ds)
 36    ```
 37    """
 38
 39    hamt: HAMT
 40    """The internal HAMT. Safe to read the CID from, if done doing operations."""
 41
 42    def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
 43        super().__init__(read_only=read_only)
 44        self.hamt = hamt
 45        if read_only:
 46            self.hamt.make_read_only()
 47        else:
 48            self.hamt.enable_write()
 49
 50    @property
 51    def read_only(self) -> bool:
 52        return self.hamt.read_only
 53
 54    def __eq__(self, val: object) -> bool:
 55        if not isinstance(val, IPFSZarr3):
 56            return False
 57        return self.hamt.root_node_id == val.hamt.root_node_id
 58
 59    async def get(
 60        self,
 61        key: str,
 62        prototype: zarr.core.buffer.BufferPrototype,
 63        byte_range: zarr.abc.store.ByteRequest | None = None,
 64    ) -> zarr.core.buffer.Buffer | None:
 65        if key not in self.hamt:
 66            return
 67        # We know this value will always be bytes since we only store bytes in the HAMT
 68        val: bytes = self.hamt[key]  # type: ignore
 69        return prototype.buffer.from_bytes(val)
 70
 71        # Hypothetical code for supporting partial writes, but there is not much point since IPFS itself doesn't support partial write and reads
 72        # Untested! If for some reason this is being uncommented and then used in the future, this needs to be tested
 73        # subset: bytes
 74        # match byte_range:
 75        #     case None:
 76        #         subset = val
 77        #     case zarr.abc.store.RangeByteRequest:
 78        #         subset = val[byte_range.start : byte_range.end]
 79        #     case zarr.abc.store.OffsetByteRequest:
 80        #         subset = val[byte_range.offset :]
 81        #     case zarr.abc.store.SuffixByteRequest:
 82        #         subset = val[-byte_range.suffix :]
 83
 84    async def get_partial_values(
 85        self,
 86        prototype: zarr.core.buffer.BufferPrototype,
 87        key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]],
 88    ) -> list[zarr.core.buffer.Buffer | None]:
 89        raise NotImplementedError
 90
 91    async def exists(self, key: str) -> bool:
 92        return key in self.hamt
 93
 94    @property
 95    def supports_writes(self) -> bool:
 96        return not self.hamt.read_only
 97
 98    @property
 99    def supports_partial_writes(self) -> bool:
100        return False
101
102    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
103        self.hamt[key] = value.to_bytes()
104
105    async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None:
106        if key not in self.hamt:
107            await self.set(key, value)
108
109    async def set_partial_values(
110        self, key_start_values: Iterable[tuple[str, int, BytesLike]]
111    ) -> None:
112        raise NotImplementedError
113
114    @property
115    def supports_deletes(self) -> bool:
116        return not self.hamt.read_only
117
118    async def delete(self, key: str) -> None:
119        try:
120            del self.hamt[key]
121        # It's fine if the key was not in the HAMT
122        # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason
123        except KeyError:
124            return
125
126    @property
127    def supports_listing(self) -> bool:
128        return True
129
130    async def list(self) -> AsyncIterator[str]:
131        for key in self.hamt:
132            yield key
133
134    async def list_prefix(self, prefix: str) -> AsyncIterator:
135        for key in self.hamt:
136            if key.startswith(prefix):
137                yield key
138
139    async def list_dir(self, prefix: str) -> AsyncIterator:
140        for key in self.hamt:
141            if key.startswith(prefix):
142                suffix = key[len(prefix) :]
143                first_slash = suffix.find("/")
144                if first_slash == -1:
145                    yield suffix
146                else:
147                    name = suffix[0:first_slash]
148                    yield name

While Zarr v2 can use a generic key-value map (MutableMapping) that HAMT already conforms to, Zarr v3s require storage classes to conform to a new abstract class. IPFSZarr3 just wraps over a HAMT to provide this compatibility.

An example of how to write and read a zarr, using xarray, is provided below.

Write and get CID

import xarray as xr
from py_hamt import IPFSStore, HAMT, IPFSZarr3

ds = ... # some xarray Dataset
ipfszarr3 = IPFSZarr3(HAMT(store=IPFSStore()))
xr.to_zarr(store=ipfszarr3)
print(ipfszarr3.hamt.root_node_id) # The CID of the root, which is used for reading

Read from CID

import xarray as xr
from multiformats import CID
from py_hamt import IPFSStore, HAMT, IPFSZarr3

cid = CID.decode("...") # the CID for the HAMT root
ipfszarr3 = IPFSZarr3(HAMT(store=IPFSStore(), root_node_id=cid), read_only=True)
ds = xr.open_zarr(store=ipfszarr3)
print(ds)
IPFSZarr3(hamt: HAMT, read_only: bool = False)
42    def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
43        super().__init__(read_only=read_only)
44        self.hamt = hamt
45        if read_only:
46            self.hamt.make_read_only()
47        else:
48            self.hamt.enable_write()
hamt: HAMT

The internal HAMT. Safe to read the CID from, if done doing operations.

read_only: bool
50    @property
51    def read_only(self) -> bool:
52        return self.hamt.read_only

Is the store read-only?

async def get( self, key: str, prototype: zarr.core.buffer.core.BufferPrototype, byte_range: zarr.abc.store.RangeByteRequest | zarr.abc.store.OffsetByteRequest | zarr.abc.store.SuffixByteRequest | None = None) -> zarr.core.buffer.core.Buffer | None:
59    async def get(
60        self,
61        key: str,
62        prototype: zarr.core.buffer.BufferPrototype,
63        byte_range: zarr.abc.store.ByteRequest | None = None,
64    ) -> zarr.core.buffer.Buffer | None:
65        if key not in self.hamt:
66            return
67        # We know this value will always be bytes since we only store bytes in the HAMT
68        val: bytes = self.hamt[key]  # type: ignore
69        return prototype.buffer.from_bytes(val)
70
71        # Hypothetical code for supporting partial writes, but there is not much point since IPFS itself doesn't support partial write and reads
72        # Untested! If for some reason this is being uncommented and then used in the future, this needs to be tested
73        # subset: bytes
74        # match byte_range:
75        #     case None:
76        #         subset = val
77        #     case zarr.abc.store.RangeByteRequest:
78        #         subset = val[byte_range.start : byte_range.end]
79        #     case zarr.abc.store.OffsetByteRequest:
80        #         subset = val[byte_range.offset :]
81        #     case zarr.abc.store.SuffixByteRequest:
82        #         subset = val[-byte_range.suffix :]

Retrieve the value associated with a given key.

Parameters

key : str prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. byte_range : ByteRequest, optional ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns

Buffer

async def get_partial_values( self, prototype: zarr.core.buffer.core.BufferPrototype, key_ranges: Iterable[tuple[str, zarr.abc.store.RangeByteRequest | zarr.abc.store.OffsetByteRequest | zarr.abc.store.SuffixByteRequest | None]]) -> list[zarr.core.buffer.core.Buffer | None]:
84    async def get_partial_values(
85        self,
86        prototype: zarr.core.buffer.BufferPrototype,
87        key_ranges: Iterable[tuple[str, zarr.abc.store.ByteRequest | None]],
88    ) -> list[zarr.core.buffer.Buffer | None]:
89        raise NotImplementedError

Retrieve possibly partial values from given key_ranges.

Parameters

prototype : BufferPrototype The prototype of the output buffer. Stores may support a default buffer prototype. key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]] Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns

list of values, in the order of the key_ranges, may contain null/none for missing keys

async def exists(self, key: str) -> bool:
91    async def exists(self, key: str) -> bool:
92        return key in self.hamt

Check if a key exists in the store.

Parameters

key : str

Returns

bool

supports_writes: bool
94    @property
95    def supports_writes(self) -> bool:
96        return not self.hamt.read_only

Does the store support writes?

supports_partial_writes: bool
 98    @property
 99    def supports_partial_writes(self) -> bool:
100        return False

Does the store support partial writes?

async def set(self, key: str, value: zarr.core.buffer.core.Buffer) -> None:
102    async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
103        self.hamt[key] = value.to_bytes()

Store a (key, value) pair.

Parameters

key : str value : Buffer

async def set_if_not_exists(self, key: str, value: zarr.core.buffer.core.Buffer) -> None:
105    async def set_if_not_exists(self, key: str, value: zarr.core.buffer.Buffer) -> None:
106        if key not in self.hamt:
107            await self.set(key, value)

Store a key to value if the key is not already present.

Parameters

key : str value : Buffer

async def set_partial_values( self, key_start_values: Iterable[tuple[str, int, bytes | bytearray | memoryview]]) -> None:
109    async def set_partial_values(
110        self, key_start_values: Iterable[tuple[str, int, BytesLike]]
111    ) -> None:
112        raise NotImplementedError

Store values at a given key, starting at byte range_start.

Parameters

key_start_values : list[tuple[str, int, BytesLike]] set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key

supports_deletes: bool
114    @property
115    def supports_deletes(self) -> bool:
116        return not self.hamt.read_only

Does the store support deletes?

async def delete(self, key: str) -> None:
118    async def delete(self, key: str) -> None:
119        try:
120            del self.hamt[key]
121        # It's fine if the key was not in the HAMT
122        # Sometimes zarr v3 calls deletes on keys that don't exist (or have already been deleted) for some reason
123        except KeyError:
124            return

Remove a key from the store

Parameters

key : str

supports_listing: bool
126    @property
127    def supports_listing(self) -> bool:
128        return True

Does the store support listing?

async def list(self) -> AsyncIterator[str]:
130    async def list(self) -> AsyncIterator[str]:
131        for key in self.hamt:
132            yield key

Retrieve all keys in the store.

Returns

AsyncIterator[str]

async def list_prefix(self, prefix: str) -> AsyncIterator:
134    async def list_prefix(self, prefix: str) -> AsyncIterator:
135        for key in self.hamt:
136            if key.startswith(prefix):
137                yield key

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters

prefix : str

Returns

AsyncIterator[str]

async def list_dir(self, prefix: str) -> AsyncIterator:
139    async def list_dir(self, prefix: str) -> AsyncIterator:
140        for key in self.hamt:
141            if key.startswith(prefix):
142                suffix = key[len(prefix) :]
143                first_slash = suffix.find("/")
144                if first_slash == -1:
145                    yield suffix
146                else:
147                    name = suffix[0:first_slash]
148                    yield name

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters

prefix : str

Returns

AsyncIterator[str]