Last updated: 2021-04-07
We have chosen to separate in-memory document cache from document processing. The in-memory document cache maintains a list of document states as LRU map. There is a limit on how many documents it stores at the same time. If the limit is achieved, the oldest document gets evicted.
Document processing is organised as a per-document task queue. It maintains sequential order of tasks on the same document. At any time only single task is executed per document. It is then task's responsibility to acquire state the way it considers appropriate.
Implementation is heavily based on rxjs primitives like Observable and BehaviorSubject. Ceramic node operates in a concurrent, asynchronous environment. This naturally leads to event-based, reactive implementation. For example, we have an updatable feed of a document state updates as a cornerstone abstraction. Vanilla EventEmitters do not compose well: one can not easily chain few EventEmitters together, neither could represent feed of events. Rxjs library allows us to do that. It provides advanced primitives so that we can focus on critical infrastructure, not on low-level plumbing.
Disposition:
α
, β
, γ
)Let us see what happens when 4α starts. As per Node.js execution model, processing is event-based. After IO/event is emitted, Node.js executes all the code non-interrupted until it stumbles upon wait for the next IO/event. Ceramic tasks are heavily IO based, so it would be fair to assume here, that at some point task 1α starts to wait for IO, for example, for IPFS retrieval. At that point Node.js starts handing the next event which in our case is task 4α.
When task 4α starts, the cache exceeds the limit, so it evicts the oldest document 1. Yet, it does not pause the task 1α. It still runs. This rises two questions.
Q1: What happens when 1β needs to run?
With separation of document cache from document processing, we must ensure that task operates on the most recent state. When the task starts, we try to load the state from memory, from state store, or from network - whatever works for the task at hand.
Q2: How memory grows based on this behaviour?
When 4α runs, and the document cache contains 3 entries (2,3,4), in total we have 4 document states, since task 1α has not released document 1 state to garbage collector. Let us call this a spawned task - the one that operates on a document that is no longer in cache. When a new task appears for a new document, we will end up with one more spawned task with one more instance of the document state residing in memory. This way memory grows based on how much concurrent operations we have. To bound this growth, we added a configuration parameter concurrentTasksLimit
. This is a maximum number of concurrently running tasks for all the documents we have. When this limit is reached, the next task has to wait till one of the tasks is done.
One could consider concurrentTasksLimit
as a ceiling for another vector of memory growh, compared to the cache limit. This vector is related to bursts of document processing.
One of the features is ability to subscribe to document changes. When a consumer subscribes to doctype updates, the updates should flow regardless of the pinning status. To enable that in the presence of the cache eviction, we mandate that a subscribed document is not evicted. For this, we internally split the cache into two buckets: volatile, and durable. Volatile is a vanilla LRU map, with cache eviction. Durable bucket maintains list of subscribed documents. When document is subscribed, we put the state in durable bucket. When unsubscribed, it moves to volatile bucket. Interesting question here is what state to put to the durable bucket. We have few scenarios here:
So, at any time, we are sure we start subscription with the latest available state.
As part of state refactor effort we have decided to maintain a certain semantics for how Stream updates state. After created, Stream maintains just the state it was initialized with. It only updates state after being manually changed or when subscribed. In the latter case, the state gets continuously updated. If looked from the inside, one could notice a Stream relies on RunningStateLike
instance to maintain the state. One could see RunningStateLike
as either an Observable of DocState with an access to the current value, or as BehaviorSubject with some additional getters. The reason for this, is we want to avoid explicit memory management in Stream instances. Any way RunningStateLike
is a kind of Subject, that has to be explicitly closed, and as one could see from sections above managing document state life cycle is complicated. Ceramic instance performs that closing behavior when the CeramicAPI instance itself is closed, and that frees a developer from closing every Stream manually. However, the more open and subscribed Stream instances there, the more memory is used, as having lots of subscribed Stream instances can cause the in-memory cache to grow behind the configured max cache size.
The main concept behind component structure is running document. Let's call a running document (or running state) an entity that has its state in memory, as opposed to sleeping document that is stored in a persistent storage. We want to make sure that there is always at max one instance of document running. We want to provide same access pattern for a document be it running or sleeping, or even if it is to be loaded from network.
We start our construction with a concept of Running state. Running state is implemented as class RunningState
based on BehaviorSubject. For the people unfamiliar with rxjs, it is like a reference to a value, document state in our case, that one could update or subscribe to for updates.
Repository shepherds a list of document states, be they running or sleeping. In theory, one could suggest a design that maintains a list of raw state values. With our design based on BehaviorSubject and rxjs, one have a sort of referential transparency. An instance of RunningState
could be passed around freely always delievering the latest value. With a theoretical alternative design, we would have to care about possible case of stale values, non-garbage-collected event emitters, and this creates too much unnecessary complexity.
RunningState
implements interface RunningStateLike
. It is beneficial for us to have the same interface across few semantically different classes. While RunningState
represents, sorry for a tautology, a running state, in some cases we want to provide a snapshot of a document state at a certain time or commit. This is achieved in SnapshotState
. It can not be updated.
We also have StateLink
implementing RunningStateLike
. Its initial state is set on construction time. It can be updated, and update only changes local state. It can be subscribed to, and this actually triggers subscription to another upstream Observable (returned from Repository#updates$
in production case). This way we achieve behavior outlined in Subscriptions section, while maintaining single responsibility for the entity. StateLink
maintains state local to Stream. The upstream observable handles Repository-specific logic.
Repository is the entrypoint for documents. It maintains StateCache
as a cache of running documents (see below), and two queues. loadingQ
serializes loading requests per document. At any time, there is only one loading process per docId. executionQ
serializes document processing tasks. At any time, there is only one task running per document. Additionally, executionQ
limits how many tasks are allowed to run concurrently in total.
Two most interesting methods here are load
and updates$
. load
tries to sequentially (per DocId) load a document state: from memory, from state store, from network. update$
is called when one subscribes to a doctype updates. updates$
returns a new Observable, that on subscription loads a running state (from memory, from state store, or from passed state), puts it into durable part of the cache, and subscribes a downstream subscriber to the running state updates. On unsubscription, it removes the running state from durable part of the cache if there are no more subscriptions still present. If there are still subscribers, the running state still remains in the durable part of the cache.
StateCache abstracts over collection of currently running states. It has two buckets: volatile
- for LRU map and durable
- for manually-managed map. StateCache.get
returns a state regardless of its volatile/durable state. One could avoid a state being evicted by moving it to durable bucket via StateCache.endure
. StateCache.free
does the reverse. Important thing here is semantics of StateCache.set
operation. It not only sets the value in volatile bucket. It also updates the value in durable bucket, if the bucket contained a value under the same key previosly. We can not set the value there blindly, as it can never be removed.
StateManager controls the way state changes. We can split available methods roughly in two buckets:
rewind
and atTime
methods - return snapshots of state at commit and at point respectively,syncGenesis
handles document options: requests anchors, requests new state from network, publishes the tip,update
handles update from IPFS Pubsub,applyCommit
applies commit to a document.When adding new featurs, make sure the code you put conforms to the responsibility theme of a component: