
 1from .hamt import HAMT, blake3_hashfn
 2from .store import Store, DictStore, IPFSStore
 3from .zarr_encryption_transformers import create_zarr_encryption_transformers
 4from .ipfszarr3 import IPFSZarr3
 6__all__ = [
 7    "HAMT",
 8    "blake3_hashfn",
 9    "Store",
10    "DictStore",
11    "IPFSStore",
12    "create_zarr_encryption_transformers",
13    "IPFSZarr3",
class HAMT(
128class HAMT(MutableMapping):
129    """
130    This HAMT presents a key value interface, like a python dictionary. The only limits are that keys can only be strings, and values can only be types amenable with [IPLDKind]( IPLDKind is a fairly flexible data model, but do note that integers are must be within the bounds of a signed 64-bit integer.
132    py-hamt uses blake3 with a 32 byte wide hash by default, but to bring your own, read more in the documentation of `blake3_hashfn`.
134    # Some notes about thread safety
136    Since modifying a HAMT changes all parts of the tree, due to reserializing and saving to the backing store, modificiations are not thread safe. Thus, we offer a read-only mode which allows for parallel accesses, or a thread safe write enabled mode. You can see what type it is from the `read_only` variable. HAMTs default to write enabled mode on creation. Calling mutating operations in read only mode will raise Exceptions.
138    In write enabled mode, all operations block and wait, so multiple threads can write to the HAMT but the operations will in reality happen one after the other.
140    # transformer_encode, transformer_decode
141    `transformer_encode` and `transformer_decode` are functions that are called right before sending raw data to the store, after being encoded with dag_cbor. They get access to the key that the value originally corresponded to.
143    HAMT also sometimes sends its internal data structure objects to the store, those will not pass through these functions.
145    # dunder method documentation
146    These are not generated by pdoc automatically so we are including their documentation here.
148    ## `__len__`
149    Total number of keys. Note that this will have to scan the entire tree in order to count, which can take a while depending on the speed of the store retrieval.
151    ## `__iter__`
152    Generator of all string keys. When initially called, this will freeze the root node and then start iteration, so subsequent sets and deletes that mutate will not be reflected in the keys returned in this iteration.
154    ## `__deepcopy__`
155    For use with the python copy module. This creates deep copies of the current HAMT. The only thing it does not copy over is the cache.
156    ```python
157    from copy import deepcopy
158    hamt = # ...
159    copy_hamt = deepcopy(hamt)
160    ```
161    """
163    store: Store
164    """@private"""
165    # Every call of this will look like self.hash_fn() and so the first argument will always take a self argument
166    hash_fn: Callable[[bytes], bytes]
167    """@private"""
168    # Only important for writing, when reading this will use buckets even if they are overly big
169    max_bucket_size: int
170    """@private"""
172    # Don't use the type alias here since this is exposed in the documentation
173    root_node_id: IPLDKind
174    """DO NOT modify this directly.
176    This is the ID that the store returns for the root node of the HAMT. This is exposed since it is sometimes useful to reconstruct other HAMTs later if using a persistent store.
178    This is really type IPLDKind, but the documentation generates this strange type instead since IPLDKind is a type union.
179    """
181    lock: Lock
182    """
183    @private
184    For use in multithreading
185    """
187    read_only: bool
188    """
189    DO NOT modify this directly. This is here for you to read and check.
191    You can modify the read status of a HAMT through the `make_read_only` or `enable_write` functions, so that the HAMT will block on making a change until all mutating operations are done.
192    """
194    transformer_encode: None | Callable[[str, bytes], bytes]
195    """This function is called to transform the raw bytes of a value after it is encoded with dag_cbor but before it gets sent to the Store."""
196    transformer_decode: None | Callable[[str, bytes], bytes]
197    """This function is called to transform the raw bytes of a value after it is retrieved from the store but before decoding with dag_cbor to python IPLDKind type objects."""
199    cache: dict[StoreID, Node]
200    """@private"""
201    max_cache_size_bytes: int
202    """The maximum size of the internal cache of nodes. We expect that some Stores will be high latency, so it is useful to cache parts of the datastructure.
204    Set to 0 if you want no cache at all."""
206    # We rely on python 3.7+'s dicts which keep insertion order of elements to do a basic LRU
207    def cache_eviction_lru(self):
208        while getsizeof(self.cache) > self.max_cache_size_bytes:
209            if len(self.cache) == 0:
210                return
211            stalest_node_id = next(iter(self.cache.keys()))
212            del self.cache[stalest_node_id]
214    def write_node(self, node: Node) -> IPLDKind:
215        """@private"""
216        node_id =
217        self.cache[node_id] = node
218        self.cache_eviction_lru()
219        return node_id
221    def read_node(self, node_id: IPLDKind) -> Node:
222        """@private"""
223        node: Node
224        # cache hit
225        if node_id in self.cache:
226            node = self.cache[node_id]
227            # Reinsert to put this key as the most recently used and last in the dict insertion order
228            del self.cache[node_id]
229            self.cache[node_id] = node
230            # cache miss
231        else:
232            node = Node.deserialize(
233            self.cache[node_id] = node
234            self.cache_eviction_lru()
236        return node
238    def __init__(
239        self,
240        store: Store,
241        hash_fn: Callable[[bytes], bytes] = blake3_hashfn,
242        read_only: bool = False,
243        root_node_id: IPLDKind = None,
244        max_cache_size_bytes=10_000_000,  # default to 10 megabytes
245        transformer_encode: None | Callable[[str, bytes], bytes] = None,
246        transformer_decode: None | Callable[[str, bytes], bytes] = None,
247    ):
248 = store
249        self.hash_fn = hash_fn
251        self.cache = {}
252        self.max_cache_size_bytes = max_cache_size_bytes
254        self.max_bucket_size = 4
255        self.read_only = read_only
256        self.lock = Lock()
258        self.transformer_encode = transformer_encode
259        self.transformer_decode = transformer_decode
261        if root_node_id is None:
262            root_node = Node()
263            self.root_node_id = self.write_node(root_node)
264        else:
265            self.root_node_id = root_node_id
266            # Make sure the cache has our root node
267            self.read_node(self.root_node_id)
269    # dunder for the python deepcopy module
270    def __deepcopy__(self, memo) -> "HAMT":
271        if not self.read_only:
272            self.lock.acquire(blocking=True)
274        copy_hamt = HAMT(
275  ,
276            hash_fn=self.hash_fn,
277            read_only=self.read_only,
278            root_node_id=self.root_node_id,
279            transformer_encode=self.transformer_encode,
280            transformer_decode=self.transformer_decode,
281        )
283        if not self.read_only:
284            self.lock.release()
286        return copy_hamt
288    def make_read_only(self):
289        """Disables all mutation of this HAMT. When enabled, the HAMT will throw errors if set or delete are called. When a HAMT is only in read only mode, it allows for safe multithreaded reads, increasing performance."""
290        self.lock.acquire(blocking=True)
291        self.read_only = True
292        self.lock.release()
294    def enable_write(self):
295        self.lock.acquire(blocking=True)
296        self.read_only = False
297        self.lock.release()
299    def _reserialize_and_link(self, node_stack: list[tuple[Link, Node]]):
300        """
301        For internal use
302        This function starts from the node at the end of the list and reserializes so that each node holds valid new IDs after insertion into the store
303        Takes a stack of nodes, we represent a stack with a list where the first element is the root element and the last element is the top of the stack
304        Each element in the list is a tuple where the first element is the ID from the store and the second element is the Node in python
305        If a node ends up being empty, then it is deleted entirely, unless it is the root node
306        Modifies in place
307        """
308        # Iterate in the reverse direction, imitating going deeper into a stack
309        for stack_index in range(len(node_stack) - 1, -1, -1):
310            old_store_id, node = node_stack[stack_index]
312            # If this node is empty, and its not the root node, then we can delete it entirely from the list
313            buckets = node.get_buckets()
314            links = node.get_links()
315            is_empty = buckets == {} and links == {}
316            is_not_root = stack_index > 0
318            if is_empty and is_not_root:
319                # Unlink from the rest of the tree
320                _, prev_node = node_stack[stack_index - 1]
321                prev_node._remove_link(old_store_id)
323                # Remove from our stack
324                node_stack.pop(stack_index)
325            else:
326                # Reserialize
327                new_store_id = self.write_node(node)
328                node_stack[stack_index] = (new_store_id, node)
330                # If this is not the last i.e. root node, we need to change the linking of the node prior in the list
331                if stack_index > 0:
332                    _, prev_node = node_stack[stack_index - 1]
333                    prev_node._replace_link(old_store_id, new_store_id)
335    def __setitem__(self, key_to_insert: str, val_to_insert: IPLDKind):
336        if self.read_only:
337            raise Exception("Cannot call set on a read only HAMT")
339        if not self.read_only:
340            self.lock.acquire(blocking=True)
342        val = dag_cbor.encode(val_to_insert)
343        if self.transformer_encode is not None:
344            val = self.transformer_encode(key_to_insert, val)
345        val_ptr =
347        node_stack: list[tuple[Link, Node]] = []
348        root_node = self.read_node(self.root_node_id)
349        node_stack.append((self.root_node_id, root_node))
351        # FIFO queue to keep track of all the KVs we need to insert
352        # This is important for if any buckets overflow and thus we need to reinsert all those KVs
353        kvs_queue: list[tuple[str, IPLDKind]] = []
354        kvs_queue.append((key_to_insert, val_ptr))
356        created_change = False
357        # Keep iterating until we have no more KVs to insert
358        while len(kvs_queue) != 0:
359            _, top_node = node_stack[-1]
360            curr_key, curr_val = kvs_queue[0]
362            raw_hash = self.hash_fn(curr_key.encode())
363            map_key = str(extract_bits(raw_hash, len(node_stack) - 1, 8))
365            buckets = top_node.get_buckets()
366            links = top_node.get_links()
368            if map_key in links and map_key in buckets:
369                self.lock.release()
370                raise Exception(
371                    "Key in both buckets and links of the node, invariant violated"
372                )
373            elif map_key in links:
374                next_node_id = links[map_key]
375                next_node = self.read_node(next_node_id)
376                node_stack.append((next_node_id, next_node))
377            elif map_key in buckets:
378                bucket = buckets[map_key]
379                created_change = True
381                # If this bucket already has this same key, just rewrite the value
382                # Since we can't use continues to go back to the top of the while loop, use this boolean flag instead
383                should_continue_at_while = False
384                for kv in bucket:
385                    if curr_key in kv:
386                        kv[curr_key] = curr_val
387                        kvs_queue.pop(0)
388                        should_continue_at_while = True
389                        break
390                if should_continue_at_while:
391                    continue
393                bucket_has_space = len(bucket) < self.max_bucket_size
394                if bucket_has_space:
395                    bucket.append({curr_key: curr_val})
396                    kvs_queue.pop(0)
397                # If bucket is full and we need to add, then all these KVs need to be taken out of this bucket and reinserted throughout the tree
398                else:
399                    # Empty the bucket of KVs into the queue
400                    for kv in bucket:
401                        for k, v in kv.items():
402                            kvs_queue.append((k, v))
404                    # Delete empty bucket, there should only be a link now
405                    del buckets[map_key]
407                    # Create a new link to a new node so that we can reflow these KVs into new buckets
408                    new_node = Node()
409                    new_node_id = self.write_node(new_node)
411                    links[map_key] = new_node_id
413                    # We need to rerun from the top with the new queue, but this time this node will have a link to put KVs deeper down in the tree
414                    node_stack.append((new_node_id, new_node))
415            else:
416                # If there is no link and no bucket, then we can create a new bucket, insert, and be done with this key
417                bucket: list[dict[str, IPLDKind]] = []
418                bucket.append({curr_key: curr_val})
419                kvs_queue.pop(0)
420                buckets[map_key] = bucket
421                created_change = True
423        # Finally, reserialize and fix all links, deleting empty nodes as needed
424        if created_change:
425            self._reserialize_and_link(node_stack)
426            node_stack_top_id = node_stack[0][0]
427            self.root_node_id = node_stack_top_id
429        if not self.read_only:
430            self.lock.release()
432    def __delitem__(self, key: str):
433        if self.read_only:
434            raise Exception("Cannot call delete on a read only HAMT")
436        if not self.read_only:
437            self.lock.acquire(blocking=True)
439        raw_hash = self.hash_fn(key.encode())
441        node_stack: list[tuple[Link, Node]] = []
442        root_node = self.read_node(self.root_node_id)
443        node_stack.append((self.root_node_id, root_node))
445        created_change = False
446        while True:
447            top_id, top_node = node_stack[-1]
448            map_key = str(extract_bits(raw_hash, len(node_stack) - 1, 8))
450            buckets = top_node.get_buckets()
451            links = top_node.get_links()
453            if map_key in buckets and map_key in links:
454                self.lock.release()
455                raise Exception(
456                    "Key in both buckets and links of the node, invariant violated"
457                )
458            elif map_key in buckets:
459                bucket = buckets[map_key]
461                # Delete from within this bucket
462                for bucket_index in range(len(bucket)):
463                    kv = bucket[bucket_index]
464                    if key in kv:
465                        created_change = True
466                        bucket.pop(bucket_index)
468                        # If this bucket becomes empty then delete this dict entry for the bucket
469                        if len(bucket) == 0:
470                            del buckets[map_key]
472                        # This must be done to avoid IndexErrors after continuing to iterate since the length of the bucket has now changed
473                        break
475                break
476            elif map_key in links:
477                link = links[map_key]
478                next_node = self.read_node(link)
479                node_stack.append((link, next_node))
480                continue
482            else:
483                # This key is not even in the HAMT so just exit
484                break
486        # Finally, reserialize and fix all links, deleting empty nodes as needed
487        if created_change:
488            self._reserialize_and_link(node_stack)
489            node_stack_top_id = node_stack[0][0]
490            self.root_node_id = node_stack_top_id
492        if not self.read_only:
493            self.lock.release()
495        if not created_change:
496            raise KeyError
498    def __getitem__(self, key: str) -> IPLDKind:
499        if not self.read_only:
500            self.lock.acquire(blocking=True)
502        raw_hash = self.hash_fn(key.encode())
504        node_id_stack: list[Link] = []
505        node_id_stack.append(self.root_node_id)
507        # Don't check if result is none but use a boolean to indicate finding something, this is because None is a possible value the HAMT can store
508        result_ptr: IPLDKind = None
509        found_a_result: bool = False
510        while True:
511            top_id = node_id_stack[-1]
512            top_node = self.read_node(top_id)
513            map_key = str(extract_bits(raw_hash, len(node_id_stack) - 1, 8))
515            # Check if this node is in one of the buckets
516            buckets = top_node.get_buckets()
517            if map_key in buckets:
518                bucket = buckets[map_key]
519                for kv in bucket:
520                    if key in kv:
521                        result_ptr = kv[key]
522                        found_a_result = True
523                        break
525            # If it isn't in one of the buckets, check if there's a link to another serialized node id
526            links = top_node.get_links()
527            if map_key in links:
528                link_id = links[map_key]
529                node_id_stack.append(link_id)
530                continue
531            # Nowhere left to go, stop walking down the tree
532            else:
533                break
535        if not self.read_only:
536            self.lock.release()
538        if not found_a_result:
539            raise KeyError
541        result_bytes =
542        if self.transformer_decode is not None:
543            result_bytes = self.transformer_decode(key, result_bytes)
544        return dag_cbor.decode(result_bytes)
546    def __len__(self) -> int:
547        key_count = 0
548        for _ in self:
549            key_count += 1
551        return key_count
553    def __iter__(self):
554        if not self.read_only:
555            self.lock.acquire(blocking=True)
557        node_id_stack = []
558        node_id_stack.append(self.root_node_id)
560        if not self.read_only:
561            self.lock.release()
563        while True:
564            if len(node_id_stack) == 0:
565                break
567            top_id = node_id_stack.pop()
568            node = self.read_node(top_id)
570            # Collect all keys from all buckets
571            buckets = node.get_buckets()
572            for bucket in buckets.values():
573                for kv in bucket:
574                    for k in kv:
575                        yield k
577            # Traverse down list of links
578            links = node.get_links()
579            for link in links.values():
580                node_id_stack.append(link)
582    def ids(self):
583        """Generator of all IDs the backing store uses"""
584        if not self.read_only:
585            self.lock.acquire(blocking=True)
587        node_id_stack: list[Link] = []
588        node_id_stack.append(self.root_node_id)
590        if not self.read_only:
591            self.lock.release()
593        while len(node_id_stack) > 0:
594            top_id = node_id_stack.pop()
595            yield top_id
596            top_node = self.read_node(top_id)
598            buckets = top_node.get_buckets()
599            for bucket in buckets.values():
600                for kv in bucket:
601                    for v in kv.values():
602                        yield v
604            # Traverse down list of ids that are the store's links'
605            links = top_node.get_links()
606            for link in links.values():
607                node_id_stack.append(link)

