fast, simple, persistent state management with Python (Ibis)

python
ibis
sqlite
duckdb
Author

Cody

Published

November 26, 2024

Using SQLite and DuckDB (via Ibis) to manage perisistent state in Python.

dkdc-state

In this post, we’ll look at the what and why of dkdc-state:

from dkdc_state import State, ibis

ibis.options.interactive = True

state = State()
state._clear() # for demonstration purposes, clear all tables
state
<dkdc_state.main.State at 0x114ca6ff0>

The State class itself isn’t very interesting, but we extend it to implement a number of other stateful Python libraries. Looking at the source code:

State class code code
from website.utils import get_class_source

get_class_source(State)
                                                                                                                   
 class State:                                                                                                      
     # initialization                                                                                              
     def __init__(self, dbpath: str = "data.db"):                                                                  
         self.dbpath = dbpath                                                                                      
                                                                                                                   
         self.wcon, self.rcon = self._cons()                                                                       
                                                                                                                   
     # reference implementation -- to be overridden                                                                
     def _cons(self) -> (ibis.BaseBackend, ibis.BaseBackend):                                                      
         # create write connection                                                                                 
         wcon = ibis.sqlite.connect(self.dbpath)                                                                   
                                                                                                                   
         # create tables in write connection                                                                       
         ...                                                                                                       
                                                                                                                   
         # create read connection                                                                                  
         rcon = ibis.duckdb.connect()                                                                              
                                                                                                                   
         # create tables in read connection                                                                        
         for table_name in wcon.list_tables():                                                                     
             rcon.read_sqlite(self.dbpath, table_name=table_name)                                                  
                                                                                                                   
         # return connections                                                                                      
         return wcon, rcon                                                                                         
                                                                                                                   
     def _clear(self, table_names: str | list[str] = None):                                                        
         # if table_names is None, clear all tables                                                                
         table_names = self.wcon.list_tables() if table_names is None else table_names                             
                                                                                                                   
         # if table_names is a string, convert to a list                                                           
         if isinstance(table_names, str):                                                                          
             table_names = [table_names]                                                                           
                                                                                                                   
         # drop views and tables                                                                                   
         for table_name in table_names:                                                                            
             self.rcon.drop_view(table_name)                                                                       
             self.wcon.drop_table(table_name)                                                                      
                                                                                                                   
         # reset connections (recreate tables)                                                                     
         self.wcon, self.rcon = self._cons()                                                                       
                                                                                                                   

We define a __init__ method that calls a _cons method that sets up a write connection (SQLite) and read connection (DuckDB) with Ibis. A standard _clear method is defined to drop the tables (and corresponding views). That’s it!

To create a fast, simple, persistent state management system we simply need to extend the State class, implement the table creation in the _cons method, and add methods to interact with the state.

dkdc-todo

Let’s look at dkdc-todo as an example:

from dkdc_todo import Todo

todo = Todo(dbpath="data.db")
todo
<dkdc_todo.main.Todo at 0x111911730>

We can view the table (and thus the schema):

todo.t()
┏━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ idx           id      user_id  subject  body    priority  status  description  labels        ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ timestamp(6)stringstringstringstringint64stringstringarray<string> │
└──────────────┴────────┴─────────┴─────────┴────────┴──────────┴────────┴─────────────┴───────────────┘

Then append items to the table:

todo.append_todo(id="test", user_id=None, subject=None, body="do your homework")
{'idx': datetime.datetime(2024, 11, 27, 18, 2, 45, 321529),
 'id': 'test',
 'user_id': None,
 'subject': None,
 'body': 'do your homework',
 'priority': 100,
 'status': None,
 'description': None,
 'labels': None}

At this point we see our record:

todo.t()
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓
┃ idx                         id      user_id  subject  body              priority  status  description  labels ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩
│ timestamp(6)stringstringstringstringint64stringstringarray… │
├────────────────────────────┼────────┼─────────┼─────────┼──────────────────┼──────────┼────────┼─────────────┼────────┤
│ 2024-11-27 18:02:45.321529test  NULLNULLdo your homework100NULLNULLNULL   │
└────────────────────────────┴────────┴─────────┴─────────┴──────────────────┴──────────┴────────┴─────────────┴────────┘

Importantly, dkdc-state tables are intended to be append-only. Once append_* (confusingly named here) is called, you must use update_* to update the record (with the same id).

To demonstrate this, let’s update that record:

todo.update_todo(id="test", user_id=None, subject=None, body="do your homework!")
{'idx': datetime.datetime(2024, 11, 27, 18, 2, 45, 373844),
 'id': 'test',
 'user_id': None,
 'subject': None,
 'body': 'do your homework!',
 'priority': 100,
 'status': None,
 'description': None,
 'labels': None}

Then view our table:

todo.t()
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓
┃ idx                         id      user_id  subject  body               priority  status  description  labels ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩
│ timestamp(6)stringstringstringstringint64stringstringarray… │
├────────────────────────────┼────────┼─────────┼─────────┼───────────────────┼──────────┼────────┼─────────────┼────────┤
│ 2024-11-27 18:02:45.373844test  NULLNULLdo your homework!100NULLNULLNULL   │
└────────────────────────────┴────────┴─────────┴─────────┴───────────────────┴──────────┴────────┴─────────────┴────────┘

There’s only 1 row! The second rows is in there, the table code is just set to only show the latest version of each record.

Let’s look at the underlying table:

todo.wcon.table(todo.todos_table_name)
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓
┃ idx                         id      user_id  subject  body               priority  status  description  labels ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩
│ timestampstringstringstringstringint64stringstringstring │
├────────────────────────────┼────────┼─────────┼─────────┼───────────────────┼──────────┼────────┼─────────────┼────────┤
│ 2024-11-27 18:02:45.321529test  NULLNULLdo your homework 100NULLNULLNULL   │
│ 2024-11-27 18:02:45.373844test  NULLNULLdo your homework!100NULLNULLNULL   │
└────────────────────────────┴────────┴─────────┴─────────┴───────────────────┴──────────┴────────┴─────────────┴────────┘

And we see both rows. You can add other convenience methods to the state class as needed:

todo.get_todo(id="test")
{'idx': datetime.datetime(2024, 11, 27, 18, 2, 45, 373844),
 'id': 'test',
 'user_id': None,
 'subject': None,
 'body': 'do your homework!',
 'priority': 100,
 'status': None,
 'description': None,
 'labels': None}

Taking a look at the source code:

Todo class code code
get_class_source(Todo)
                                                                                                                   
 class Todo(State):                                                                                                
     def __init__(self, dbpath: str = None):                                                                       
         if dbpath is None:                                                                                        
             dbpath = os.path.join(get_dkdc_dir(), "todo.db")                                                      
         super().__init__(dbpath=dbpath)                                                                           
                                                                                                                   
     def _cons(self) -> (ibis.BaseBackend, ibis.BaseBackend):                                                      
         # create write connection                                                                                 
         wcon = ibis.sqlite.connect(self.dbpath)                                                                   
                                                                                                                   
         # create tables in write connection                                                                       
         ## todos data                                                                                             
         self.todos_table_name = "todos"                                                                           
         schema = ibis.schema(                                                                                     
             {                                                                                                     
                 "idx": dt.timestamp,                                                                              
                 "id": str,                                                                                        
                 "user_id": str,                                                                                   
                 "subject": str,                                                                                   
                 "body": str,                                                                                      
                 "priority": int,                                                                                  
                 "status": str,                                                                                    
                 "description": str,                                                                               
                 "labels": str,  # comma-separated list of labels                                                  
             }                                                                                                     
         )                                                                                                         
         if self.todos_table_name not in wcon.list_tables():                                                       
             wcon.create_table(self.todos_table_name, schema=schema)                                               
                                                                                                                   
         # create read connection                                                                                  
         rcon = ibis.duckdb.connect()                                                                              
                                                                                                                   
         # create tables in read connection                                                                        
         for table_name in wcon.list_tables():                                                                     
             rcon.read_sqlite(self.dbpath, table_name=table_name)                                                  
                                                                                                                   
         return wcon, rcon                                                                                         
                                                                                                                   
     # tables                                                                                                      
     def todos_t(self, user_id: str = None):                                                                       
         # get todos data                                                                                          
         t = self.rcon.table(self.todos_table_name)                                                                
                                                                                                                   
         # filter by user_id                                                                                       
         if user_id:                                                                                               
             t = t.filter(t["user_id"] == user_id)                                                                 
                                                                                                                   
         # get only the latest metadata                                                                            
         t = (                                                                                                     
             t.mutate(                                                                                             
                 rank=ibis.row_number().over(                                                                      
                     ibis.window(                                                                                  
                         group_by="id",                                                                            
                         order_by=ibis.desc("idx"),                                                                
                     )                                                                                             
                 )                                                                                                 
             )                                                                                                     
             .filter(ibis._["rank"] == 0)                                                                          
             .drop("rank")                                                                                         
         )                                                                                                         
                                                                                                                   
         # comma-separated lists to arrays                                                                         
         t = t.mutate(labels=t["labels"].split(","))                                                               
                                                                                                                   
         # order                                                                                                   
         t = t.order_by(ibis.asc("priority"), ibis.desc("idx"))                                                    
                                                                                                                   
         # return the data                                                                                         
         return t                                                                                                  
                                                                                                                   
     # contains                                                                                                    
     def contains_todo(self, id: str, user_id: str = None) -> bool:                                                
         t = self.todos_t(user_id=user_id)                                                                         
         return t.filter(t["id"] == id).count().to_pyarrow().as_py() > 0                                           
                                                                                                                   
     # get record                                                                                                  
     def get_todos(self, user_id: str = None):                                                                     
         t = self.todos_t(user_id=user_id)                                                                         
                                                                                                                   
         return t.to_pyarrow().to_pylist()                                                                         
                                                                                                                   
     def get_todo(self, id: str, user_id: str = None):                                                             
         t = self.todos_t(user_id=user_id)                                                                         
         return t.filter(t["id"] == id).to_pyarrow().to_pylist()[0]                                                
                                                                                                                   
     # append record                                                                                               
     def append_todo(                                                                                              
         self,                                                                                                     
         id: str,                                                                                                  
         user_id: str,                                                                                             
         subject: str,                                                                                             
         body: str,                                                                                                
         priority: int = 100,                                                                                      
         status: str = None,                                                                                       
         description: str = None,                                                                                  
         labels: list[str] = None,                                                                                 
     ):                                                                                                            
         if self.contains_todo(id=id):                                                                             
             raise ValueError(f"todo {id} already exists")                                                         
                                                                                                                   
         data = {                                                                                                  
             "idx": [now()],                                                                                       
             "id": [id],                                                                                           
             "user_id": [user_id],                                                                                 
             "subject": [subject],                                                                                 
             "body": [body],                                                                                       
             "priority": [priority],                                                                               
             "status": [status],                                                                                   
             "description": [description],                                                                         
             "labels": [",".join(labels) if labels else None],                                                     
         }                                                                                                         
         self.wcon.insert(self.todos_table_name, data)                                                             
                                                                                                                   
         return self.get_todo(id=id)                                                                               
                                                                                                                   
     # update record                                                                                               
     def update_todo(                                                                                              
         self,                                                                                                     
         id: str,                                                                                                  
         user_id: str,                                                                                             
         subject: str,                                                                                             
         body: str,                                                                                                
         priority: int = 100,                                                                                      
         status: str = None,                                                                                       
         description: str = None,                                                                                  
         labels: list[str] = None,                                                                                 
     ):                                                                                                            
         if not self.contains_todo(id=id):                                                                         
             raise ValueError(f"todo {id} does not exist")                                                         
                                                                                                                   
         data = {                                                                                                  
             "idx": [now()],                                                                                       
             "id": [id],                                                                                           
             "user_id": [user_id],                                                                                 
             "subject": [subject],                                                                                 
             "body": [body],                                                                                       
             "priority": [priority],                                                                               
             "status": [status],                                                                                   
             "description": [description],                                                                         
             "labels": [",".join(labels) if labels else None],                                                     
         }                                                                                                         
         self.wcon.insert(self.todos_table_name, data)                                                             
                                                                                                                   
         return self.get_todo(id=id)                                                                               
                                                                                                                   
     # aliases                                                                                                     
     __call__ = get_todos                                                                                          
     t = todos_t                                                                                                   
                                                                                                                   

We have a simple, fast, performant todo list.

dkdc-lake

Let’s look at dkdc-lake as another example:

from dkdc_lake import Lake

lake = Lake(dbpath="data.db")
lake
<dkdc_lake.main.Lake at 0x122fdc1d0>

We can again view the table/schema:

lake.t()
┏━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ idx           id      user_id  path    filename  filetype  data    version  status  description  labels        ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ timestamp(6)stringstringstringstringstringbinaryint64stringstringarray<string> │
└──────────────┴────────┴─────────┴────────┴──────────┴──────────┴────────┴─────────┴────────┴─────────────┴───────────────┘

And append a record (representing a file) into it:

lake.append_file(
    user_id=None,
    path=None,
    filename="test.txt",
    filetype="txt",
    data=b"hello world",
)
{'idx': datetime.datetime(2024, 11, 27, 18, 2, 45, 505331),
 'id': '173273056550533-f0d929c0',
 'user_id': None,
 'path': None,
 'filename': 'test.txt',
 'filetype': 'txt',
 'data': b'hello world',
 'version': None,
 'status': None,
 'description': None,
 'labels': None}

Now, we see our record:

lake.t()
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓
┃ idx                         id                        user_id  path    filename  filetype  data            version  status  description  labels ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩
│ timestamp(6)stringstringstringstringstringbinaryint64stringstringarray… │
├────────────────────────────┼──────────────────────────┼─────────┼────────┼──────────┼──────────┼────────────────┼─────────┼────────┼─────────────┼────────┤
│ 2024-11-27 18:02:45.505331173273056550533-f0d929c0NULLNULLtest.txttxt     b'hello world'NULLNULLNULLNULL   │
└────────────────────────────┴──────────────────────────┴─────────┴────────┴──────────┴──────────┴────────────────┴─────────┴────────┴─────────────┴────────┘

And like before can update it:

lake.update_file(
    user_id=None,
    path=None,
    filename="test.txt",
    filetype="txt",
    data=b"hello world!",
    version=None,
    status=None,
    description=None,
    labels=None,
)
{'idx': datetime.datetime(2024, 11, 27, 18, 2, 45, 586443),
 'id': '173273056550533-f0d929c0',
 'user_id': None,
 'path': None,
 'filename': 'test.txt',
 'filetype': 'txt',
 'data': b'hello world!',
 'version': None,
 'status': None,
 'description': None,
 'labels': None}

Showing only one row:

lake.t()
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓
┃ idx                         id                        user_id  path    filename  filetype  data             version  status  description  labels ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩
│ timestamp(6)stringstringstringstringstringbinaryint64stringstringarray… │
├────────────────────────────┼──────────────────────────┼─────────┼────────┼──────────┼──────────┼─────────────────┼─────────┼────────┼─────────────┼────────┤
│ 2024-11-27 18:02:45.586443173273056550533-f0d929c0NULLNULLtest.txttxt     b'hello world!'NULLNULLNULLNULL   │
└────────────────────────────┴──────────────────────────┴─────────┴────────┴──────────┴──────────┴─────────────────┴─────────┴────────┴─────────────┴────────┘

But again, there are two in the underlying SQLite table:

lake.wcon.table(lake.lake_table_name)
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┓
┃ idx                         id                        user_id  path    filename  filetype  data             version  status  description  labels ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━┩
│ timestampstringstringstringstringstringbinaryint64stringstringstring │
├────────────────────────────┼──────────────────────────┼─────────┼────────┼──────────┼──────────┼─────────────────┼─────────┼────────┼─────────────┼────────┤
│ 2024-11-27 18:02:45.505331173273056550533-f0d929c0NULLNULLtest.txttxt     b'hello world'NULLNULLNULLNULL   │
│ 2024-11-27 18:02:45.586443173273056550533-f0d929c0NULLNULLtest.txttxt     b'hello world!'NULLNULLNULLNULL   │
└────────────────────────────┴──────────────────────────┴─────────┴────────┴──────────┴──────────┴─────────────────┴─────────┴────────┴─────────────┴────────┘

Our view above is using a window function over idx (a timestamp) to take only the latest version of the record:

Lake class code code
get_class_source(Lake)
                                                                                                                   
 class Lake(State):                                                                                                
     def __init__(self, dbpath: str = None):                                                                       
         if dbpath is None:                                                                                        
             dbpath = os.path.join(get_dkdc_dir(), "lake.db")                                                      
         super().__init__(dbpath=dbpath)                                                                           
                                                                                                                   
     def _cons(self) -> (ibis.BaseBackend, ibis.BaseBackend):                                                      
         # create write connection                                                                                 
         wcon = ibis.sqlite.connect(self.dbpath)                                                                   
                                                                                                                   
         # create tables in write connection                                                                       
         ## lake data                                                                                              
         self.lake_table_name = "lake"                                                                             
         schema = ibis.schema(                                                                                     
             {                                                                                                     
                 "idx": dt.timestamp,                                                                              
                 "id": str,                                                                                        
                 "user_id": str,                                                                                   
                 "path": str,                                                                                      
                 "filename": str,                                                                                  
                 "filetype": str,                                                                                  
                 "data": dt.binary,                                                                                
                 "version": int,                                                                                   
                 "status": str,                                                                                    
                 "description": str,                                                                               
                 "labels": str,  # comma-separated list of labels                                                  
             }                                                                                                     
         )                                                                                                         
         if self.lake_table_name not in wcon.list_tables():                                                        
             wcon.create_table(self.lake_table_name, schema=schema)                                                
                                                                                                                   
         # create read connection                                                                                  
         rcon = ibis.duckdb.connect()                                                                              
                                                                                                                   
         # create tables in read connection                                                                        
         for table_name in wcon.list_tables():                                                                     
             rcon.read_sqlite(self.dbpath, table_name=table_name)                                                  
                                                                                                                   
         return wcon, rcon                                                                                         
                                                                                                                   
     # tables                                                                                                      
     def lake_t(self, user_id: str = None):                                                                        
         # get lake data                                                                                           
         t = self.rcon.table(self.lake_table_name)                                                                 
                                                                                                                   
         # filter by user_id                                                                                       
         if user_id:                                                                                               
             t = t.filter(t["user_id"] == user_id)                                                                 
                                                                                                                   
         # get only the latest metadata                                                                            
         t = (                                                                                                     
             t.mutate(                                                                                             
                 rank=ibis.row_number().over(                                                                      
                     ibis.window(                                                                                  
                         group_by="id",                                                                            
                         order_by=ibis.desc("idx"),                                                                
                     )                                                                                             
                 )                                                                                                 
             )                                                                                                     
             .filter(ibis._["rank"] == 0)                                                                          
             .drop("rank")                                                                                         
         )                                                                                                         
                                                                                                                   
         # comma-separated lists to arrays                                                                         
         t = t.mutate(labels=t["labels"].split(","))                                                               
                                                                                                                   
         # order                                                                                                   
         t = t.order_by(ibis.desc("idx"))                                                                          
                                                                                                                   
         # return the data                                                                                         
         return t                                                                                                  
                                                                                                                   
     # contains                                                                                                    
     def contains_path(self, path: str, user_id: str = None) -> bool:                                              
         t = self.lake_t(user_id=user_id)                                                                          
         return t.filter(t["path"] == path).count().to_pyarrow().as_py() > 0                                       
                                                                                                                   
     def contains_file(                                                                                            
         self, filename: str, path: str = None, user_id: str = None                                                
     ) -> bool:                                                                                                    
         t = self.lake_t(user_id=user_id)                                                                          
         return (                                                                                                  
             t.filter((t["path"] == path) & (t["filename"] == filename))                                           
             .count()                                                                                              
             .to_pyarrow()                                                                                         
             .as_py()                                                                                              
             > 0                                                                                                   
         )                                                                                                         
                                                                                                                   
     # get record                                                                                                  
     def get_file(self, filename: str, path: str = None, user_id: str = None):                                     
         if not self.contains_file(filename=filename, path=path, user_id=user_id):                                 
             raise ValueError(f"File {filename} does not exist")                                                   
                                                                                                                   
         t = self.lake_t(user_id=user_id)                                                                          
         return (                                                                                                  
             t.filter((t["path"] == path) & (t["filename"] == filename))                                           
             .to_pyarrow()                                                                                         
             .to_pylist()[0]                                                                                       
         )                                                                                                         
                                                                                                                   
     # append record                                                                                               
     def append_file(                                                                                              
         self,                                                                                                     
         user_id: str = None,                                                                                      
         path: str = None,                                                                                         
         filename: str = None,                                                                                     
         filetype: str = None,                                                                                     
         data: bytes = None,                                                                                       
         version: int = None,                                                                                      
         status: str = None,                                                                                       
         description: str = None,                                                                                  
         labels: list[str] = None,                                                                                 
     ):                                                                                                            
         assert (filename is not None) and (                                                                       
             data is not None                                                                                      
         ), "user_id, filename, and data are required"                                                             
                                                                                                                   
         if self.contains_file(filename=filename, path=path, user_id=user_id):                                     
             raise ValueError(f"File {filename} already exists")                                                   
                                                                                                                   
         data = {                                                                                                  
             "idx": [now()],                                                                                       
             "id": [uuid()],                                                                                       
             "user_id": [user_id],                                                                                 
             "path": [path],                                                                                       
             "filename": [filename],                                                                               
             "filetype": [filetype],                                                                               
             "data": [data],                                                                                       
             "version": [version],                                                                                 
             "status": [status],                                                                                   
             "description": [description],                                                                         
             "labels": [",".join(labels) if labels else None],                                                     
         }                                                                                                         
         self.wcon.insert(self.lake_table_name, data)                                                              
                                                                                                                   
         return self.get_file(filename=filename, path=path, user_id=user_id)                                       
                                                                                                                   
     # update record                                                                                               
     def update_file(                                                                                              
         self,                                                                                                     
         user_id: str,                                                                                             
         path: str,                                                                                                
         filename: str,                                                                                            
         filetype: str,                                                                                            
         data: bytes,                                                                                              
         version: int,                                                                                             
         status: str,                                                                                              
         description: str,                                                                                         
         labels: list[str] = None,                                                                                 
     ):                                                                                                            
         if not self.contains_file(filename=filename, path=path, user_id=user_id):                                 
             raise ValueError(f"File {filename} does not exist")                                                   
                                                                                                                   
         f = self.get_file(filename=filename, path=path, user_id=user_id)                                          
         id = f["id"]                                                                                              
                                                                                                                   
         data = {                                                                                                  
             "idx": [now()],                                                                                       
             "id": [id],                                                                                           
             "user_id": [user_id],                                                                                 
             "path": [path],                                                                                       
             "filename": [filename],                                                                               
             "filetype": [filetype],                                                                               
             "data": [data],                                                                                       
             "version": [version],                                                                                 
             "status": [status],                                                                                   
             "description": [description],                                                                         
             "labels": [",".join(labels) if labels else None],                                                     
         }                                                                                                         
         self.wcon.insert(self.lake_table_name, data)                                                              
                                                                                                                   
         return self.get_file(filename=filename, path=path, user_id=user_id)                                       
                                                                                                                   
     # aliases                                                                                                     
     __call__ = get_file                                                                                           
     t = lake_t                                                                                                    
                                                                                                                   

looking ahead

I’m not really sure how well this works in practice at scale. I know I can process a few billion rows on my laptop with DuckDB, though that was on Parquet files and not views on top of SQLite tables. I’m not sure how well concurrent users writing to the same SQLite database will work. But it should be fine?

Caution

This code is early and not the best. It will likely be improved over time. Check the respective repositories!

Most importantly for dkdc-io, the user interface is simple and useful. The details can change behind the abstraction.

Back to top