from dkdc_state import State, ibis
= True
ibis.options.interactive
= State()
state # for demonstration purposes, clear all tables
state._clear() state
<dkdc_state.main.State at 0x114ca6ff0>
Cody
November 26, 2024
Using SQLite and DuckDB (via Ibis) to manage perisistent state in Python.
In this post, we’ll look at the what and why of dkdc-state:
from dkdc_state import State, ibis
ibis.options.interactive = True
state = State()
state._clear() # for demonstration purposes, clear all tables
state
<dkdc_state.main.State at 0x114ca6ff0>
The State class itself isn’t very interesting, but we extend it to implement a number of other stateful Python libraries. Looking at the source code:
class State: # initialization def __init__(self, dbpath: str = "data.db"): self.dbpath = dbpath self.wcon, self.rcon = self._cons() # reference implementation -- to be overridden def _cons(self) -> (ibis.BaseBackend, ibis.BaseBackend): # create write connection wcon = ibis.sqlite.connect(self.dbpath) # create tables in write connection ... # create read connection rcon = ibis.duckdb.connect() # create tables in read connection for table_name in wcon.list_tables(): rcon.read_sqlite(self.dbpath, table_name=table_name) # return connections return wcon, rcon def _clear(self, table_names: str | list[str] = None): # if table_names is None, clear all tables table_names = self.wcon.list_tables() if table_names is None else table_names # if table_names is a string, convert to a list if isinstance(table_names, str): table_names = [table_names] # drop views and tables for table_name in table_names: self.rcon.drop_view(table_name) self.wcon.drop_table(table_name) # reset connections (recreate tables) self.wcon, self.rcon = self._cons()
We define a __init__
method that calls a _cons
method that sets up a write connection (SQLite) and read connection (DuckDB) with Ibis. A standard _clear
method is defined to drop the tables (and corresponding views). That’s it!
To create a fast, simple, persistent state management system we simply need to extend the State class, implement the table creation in the _cons
method, and add methods to interact with the state.
Let’s look at dkdc-todo as an example:
We can view the table (and thus the schema):
┏━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ idx ┃ id ┃ user_id ┃ subject ┃ body ┃ priority ┃ status ┃ description ┃ labels ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ timestamp(6) │ string │ string │ string │ string │ int64 │ string │ string │ array<string> │ └──────────────┴────────┴─────────┴─────────┴────────┴──────────┴────────┴─────────────┴───────────────┘
Then append items to the table:
{'idx': datetime.datetime(2024, 11, 27, 18, 2, 45, 321529),
'id': 'test',
'user_id': None,
'subject': None,
'body': 'do your homework',
'priority': 100,
'status': None,
'description': None,
'labels': None}
At this point we see our record:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓ ┃ idx ┃ id ┃ user_id ┃ subject ┃ body ┃ priority ┃ status ┃ description ┃ labels ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩ │ timestamp(6) │ string │ string │ string │ string │ int64 │ string │ string │ array… │ ├────────────────────────────┼────────┼─────────┼─────────┼──────────────────┼──────────┼────────┼─────────────┼────────┤ │ 2024-11-27 18:02:45.321529 │ test │ NULL │ NULL │ do your homework │ 100 │ NULL │ NULL │ NULL │ └────────────────────────────┴────────┴─────────┴─────────┴──────────────────┴──────────┴────────┴─────────────┴────────┘
Importantly, dkdc-state tables are intended to be append-only. Once append_*
(confusingly named here) is called, you must use update_*
to update the record (with the same id).
To demonstrate this, let’s update that record:
{'idx': datetime.datetime(2024, 11, 27, 18, 2, 45, 373844),
'id': 'test',
'user_id': None,
'subject': None,
'body': 'do your homework!',
'priority': 100,
'status': None,
'description': None,
'labels': None}
Then view our table:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓ ┃ idx ┃ id ┃ user_id ┃ subject ┃ body ┃ priority ┃ status ┃ description ┃ labels ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩ │ timestamp(6) │ string │ string │ string │ string │ int64 │ string │ string │ array… │ ├────────────────────────────┼────────┼─────────┼─────────┼───────────────────┼──────────┼────────┼─────────────┼────────┤ │ 2024-11-27 18:02:45.373844 │ test │ NULL │ NULL │ do your homework! │ 100 │ NULL │ NULL │ NULL │ └────────────────────────────┴────────┴─────────┴─────────┴───────────────────┴──────────┴────────┴─────────────┴────────┘
There’s only 1 row! The second rows is in there, the table code is just set to only show the latest version of each record.
Let’s look at the underlying table:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓ ┃ idx ┃ id ┃ user_id ┃ subject ┃ body ┃ priority ┃ status ┃ description ┃ labels ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩ │ timestamp │ string │ string │ string │ string │ int64 │ string │ string │ string │ ├────────────────────────────┼────────┼─────────┼─────────┼───────────────────┼──────────┼────────┼─────────────┼────────┤ │ 2024-11-27 18:02:45.321529 │ test │ NULL │ NULL │ do your homework │ 100 │ NULL │ NULL │ NULL │ │ 2024-11-27 18:02:45.373844 │ test │ NULL │ NULL │ do your homework! │ 100 │ NULL │ NULL │ NULL │ └────────────────────────────┴────────┴─────────┴─────────┴───────────────────┴──────────┴────────┴─────────────┴────────┘
And we see both rows. You can add other convenience methods to the state class as needed:
{'idx': datetime.datetime(2024, 11, 27, 18, 2, 45, 373844),
'id': 'test',
'user_id': None,
'subject': None,
'body': 'do your homework!',
'priority': 100,
'status': None,
'description': None,
'labels': None}
Taking a look at the source code:
class Todo(State): def __init__(self, dbpath: str = None): if dbpath is None: dbpath = os.path.join(get_dkdc_dir(), "todo.db") super().__init__(dbpath=dbpath) def _cons(self) -> (ibis.BaseBackend, ibis.BaseBackend): # create write connection wcon = ibis.sqlite.connect(self.dbpath) # create tables in write connection ## todos data self.todos_table_name = "todos" schema = ibis.schema( { "idx": dt.timestamp, "id": str, "user_id": str, "subject": str, "body": str, "priority": int, "status": str, "description": str, "labels": str, # comma-separated list of labels } ) if self.todos_table_name not in wcon.list_tables(): wcon.create_table(self.todos_table_name, schema=schema) # create read connection rcon = ibis.duckdb.connect() # create tables in read connection for table_name in wcon.list_tables(): rcon.read_sqlite(self.dbpath, table_name=table_name) return wcon, rcon # tables def todos_t(self, user_id: str = None): # get todos data t = self.rcon.table(self.todos_table_name) # filter by user_id if user_id: t = t.filter(t["user_id"] == user_id) # get only the latest metadata t = ( t.mutate( rank=ibis.row_number().over( ibis.window( group_by="id", order_by=ibis.desc("idx"), ) ) ) .filter(ibis._["rank"] == 0) .drop("rank") ) # comma-separated lists to arrays t = t.mutate(labels=t["labels"].split(",")) # order t = t.order_by(ibis.asc("priority"), ibis.desc("idx")) # return the data return t # contains def contains_todo(self, id: str, user_id: str = None) -> bool: t = self.todos_t(user_id=user_id) return t.filter(t["id"] == id).count().to_pyarrow().as_py() > 0 # get record def get_todos(self, user_id: str = None): t = self.todos_t(user_id=user_id) return t.to_pyarrow().to_pylist() def get_todo(self, id: str, user_id: str = None): t = self.todos_t(user_id=user_id) return t.filter(t["id"] == id).to_pyarrow().to_pylist()[0] # append record def append_todo( self, id: str, user_id: str, subject: str, body: str, priority: int = 100, status: str = None, description: str = None, labels: list[str] = None, ): if self.contains_todo(id=id): raise ValueError(f"todo {id} already exists") data = { "idx": [now()], "id": [id], "user_id": [user_id], "subject": [subject], "body": [body], "priority": [priority], "status": [status], "description": [description], "labels": [",".join(labels) if labels else None], } self.wcon.insert(self.todos_table_name, data) return self.get_todo(id=id) # update record def update_todo( self, id: str, user_id: str, subject: str, body: str, priority: int = 100, status: str = None, description: str = None, labels: list[str] = None, ): if not self.contains_todo(id=id): raise ValueError(f"todo {id} does not exist") data = { "idx": [now()], "id": [id], "user_id": [user_id], "subject": [subject], "body": [body], "priority": [priority], "status": [status], "description": [description], "labels": [",".join(labels) if labels else None], } self.wcon.insert(self.todos_table_name, data) return self.get_todo(id=id) # aliases __call__ = get_todos t = todos_t
We have a simple, fast, performant todo list.
Let’s look at dkdc-lake as another example:
We can again view the table/schema:
┏━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ idx ┃ id ┃ user_id ┃ path ┃ filename ┃ filetype ┃ data ┃ version ┃ status ┃ description ┃ labels ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ timestamp(6) │ string │ string │ string │ string │ string │ binary │ int64 │ string │ string │ array<string> │ └──────────────┴────────┴─────────┴────────┴──────────┴──────────┴────────┴─────────┴────────┴─────────────┴───────────────┘
And append a record (representing a file) into it:
lake.append_file(
user_id=None,
path=None,
filename="test.txt",
filetype="txt",
data=b"hello world",
)
{'idx': datetime.datetime(2024, 11, 27, 18, 2, 45, 505331),
'id': '173273056550533-f0d929c0',
'user_id': None,
'path': None,
'filename': 'test.txt',
'filetype': 'txt',
'data': b'hello world',
'version': None,
'status': None,
'description': None,
'labels': None}
Now, we see our record:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓ ┃ idx ┃ id ┃ user_id ┃ path ┃ filename ┃ filetype ┃ data ┃ version ┃ status ┃ description ┃ labels ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩ │ timestamp(6) │ string │ string │ string │ string │ string │ binary │ int64 │ string │ string │ array… │ ├────────────────────────────┼──────────────────────────┼─────────┼────────┼──────────┼──────────┼────────────────┼─────────┼────────┼─────────────┼────────┤ │ 2024-11-27 18:02:45.505331 │ 173273056550533-f0d929c0 │ NULL │ NULL │ test.txt │ txt │ b'hello world' │ NULL │ NULL │ NULL │ NULL │ └────────────────────────────┴──────────────────────────┴─────────┴────────┴──────────┴──────────┴────────────────┴─────────┴────────┴─────────────┴────────┘
And like before can update it:
lake.update_file(
user_id=None,
path=None,
filename="test.txt",
filetype="txt",
data=b"hello world!",
version=None,
status=None,
description=None,
labels=None,
)
{'idx': datetime.datetime(2024, 11, 27, 18, 2, 45, 586443),
'id': '173273056550533-f0d929c0',
'user_id': None,
'path': None,
'filename': 'test.txt',
'filetype': 'txt',
'data': b'hello world!',
'version': None,
'status': None,
'description': None,
'labels': None}
Showing only one row:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓ ┃ idx ┃ id ┃ user_id ┃ path ┃ filename ┃ filetype ┃ data ┃ version ┃ status ┃ description ┃ labels ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩ │ timestamp(6) │ string │ string │ string │ string │ string │ binary │ int64 │ string │ string │ array… │ ├────────────────────────────┼──────────────────────────┼─────────┼────────┼──────────┼──────────┼─────────────────┼─────────┼────────┼─────────────┼────────┤ │ 2024-11-27 18:02:45.586443 │ 173273056550533-f0d929c0 │ NULL │ NULL │ test.txt │ txt │ b'hello world!' │ NULL │ NULL │ NULL │ NULL │ └────────────────────────────┴──────────────────────────┴─────────┴────────┴──────────┴──────────┴─────────────────┴─────────┴────────┴─────────────┴────────┘
But again, there are two in the underlying SQLite table:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓ ┃ idx ┃ id ┃ user_id ┃ path ┃ filename ┃ filetype ┃ data ┃ version ┃ status ┃ description ┃ labels ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩ │ timestamp │ string │ string │ string │ string │ string │ binary │ int64 │ string │ string │ string │ ├────────────────────────────┼──────────────────────────┼─────────┼────────┼──────────┼──────────┼─────────────────┼─────────┼────────┼─────────────┼────────┤ │ 2024-11-27 18:02:45.505331 │ 173273056550533-f0d929c0 │ NULL │ NULL │ test.txt │ txt │ b'hello world' │ NULL │ NULL │ NULL │ NULL │ │ 2024-11-27 18:02:45.586443 │ 173273056550533-f0d929c0 │ NULL │ NULL │ test.txt │ txt │ b'hello world!' │ NULL │ NULL │ NULL │ NULL │ └────────────────────────────┴──────────────────────────┴─────────┴────────┴──────────┴──────────┴─────────────────┴─────────┴────────┴─────────────┴────────┘
Our view above is using a window function over idx
(a timestamp) to take only the latest version of the record:
class Lake(State): def __init__(self, dbpath: str = None): if dbpath is None: dbpath = os.path.join(get_dkdc_dir(), "lake.db") super().__init__(dbpath=dbpath) def _cons(self) -> (ibis.BaseBackend, ibis.BaseBackend): # create write connection wcon = ibis.sqlite.connect(self.dbpath) # create tables in write connection ## lake data self.lake_table_name = "lake" schema = ibis.schema( { "idx": dt.timestamp, "id": str, "user_id": str, "path": str, "filename": str, "filetype": str, "data": dt.binary, "version": int, "status": str, "description": str, "labels": str, # comma-separated list of labels } ) if self.lake_table_name not in wcon.list_tables(): wcon.create_table(self.lake_table_name, schema=schema) # create read connection rcon = ibis.duckdb.connect() # create tables in read connection for table_name in wcon.list_tables(): rcon.read_sqlite(self.dbpath, table_name=table_name) return wcon, rcon # tables def lake_t(self, user_id: str = None): # get lake data t = self.rcon.table(self.lake_table_name) # filter by user_id if user_id: t = t.filter(t["user_id"] == user_id) # get only the latest metadata t = ( t.mutate( rank=ibis.row_number().over( ibis.window( group_by="id", order_by=ibis.desc("idx"), ) ) ) .filter(ibis._["rank"] == 0) .drop("rank") ) # comma-separated lists to arrays t = t.mutate(labels=t["labels"].split(",")) # order t = t.order_by(ibis.desc("idx")) # return the data return t # contains def contains_path(self, path: str, user_id: str = None) -> bool: t = self.lake_t(user_id=user_id) return t.filter(t["path"] == path).count().to_pyarrow().as_py() > 0 def contains_file( self, filename: str, path: str = None, user_id: str = None ) -> bool: t = self.lake_t(user_id=user_id) return ( t.filter((t["path"] == path) & (t["filename"] == filename)) .count() .to_pyarrow() .as_py() > 0 ) # get record def get_file(self, filename: str, path: str = None, user_id: str = None): if not self.contains_file(filename=filename, path=path, user_id=user_id): raise ValueError(f"File {filename} does not exist") t = self.lake_t(user_id=user_id) return ( t.filter((t["path"] == path) & (t["filename"] == filename)) .to_pyarrow() .to_pylist()[0] ) # append record def append_file( self, user_id: str = None, path: str = None, filename: str = None, filetype: str = None, data: bytes = None, version: int = None, status: str = None, description: str = None, labels: list[str] = None, ): assert (filename is not None) and ( data is not None ), "user_id, filename, and data are required" if self.contains_file(filename=filename, path=path, user_id=user_id): raise ValueError(f"File {filename} already exists") data = { "idx": [now()], "id": [uuid()], "user_id": [user_id], "path": [path], "filename": [filename], "filetype": [filetype], "data": [data], "version": [version], "status": [status], "description": [description], "labels": [",".join(labels) if labels else None], } self.wcon.insert(self.lake_table_name, data) return self.get_file(filename=filename, path=path, user_id=user_id) # update record def update_file( self, user_id: str, path: str, filename: str, filetype: str, data: bytes, version: int, status: str, description: str, labels: list[str] = None, ): if not self.contains_file(filename=filename, path=path, user_id=user_id): raise ValueError(f"File {filename} does not exist") f = self.get_file(filename=filename, path=path, user_id=user_id) id = f["id"] data = { "idx": [now()], "id": [id], "user_id": [user_id], "path": [path], "filename": [filename], "filetype": [filetype], "data": [data], "version": [version], "status": [status], "description": [description], "labels": [",".join(labels) if labels else None], } self.wcon.insert(self.lake_table_name, data) return self.get_file(filename=filename, path=path, user_id=user_id) # aliases __call__ = get_file t = lake_t
I’m not really sure how well this works in practice at scale. I know I can process a few billion rows on my laptop with DuckDB, though that was on Parquet files and not views on top of SQLite tables. I’m not sure how well concurrent users writing to the same SQLite database will work. But it should be fine?
This code is early and not the best. It will likely be improved over time. Check the respective repositories!
Most importantly for dkdc-io, the user interface is simple and useful. The details can change behind the abstraction.