April 29, 2024 by CodeFlowerHorn
Asynchronous SQLAlchemy in Python: A Comprehensive Guide
To increase scalability and performance, use asynchronous SqlAlchemy. Your application can carry out other tasks while it waits for database operations to finish by utilizing asynchronous I/O. In particular, for resource-intensive or high-traffic applications, this can lead to quicker response times and better system resource usage.
Prerequisite
Requirements
python3 -m pip install --upgrade sqlalchemy prettytable
Import libraries
import asyncio
from sqlalchemy import Column, Integer, String, select, update
from sqlalchemy.orm import declarative_base
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from prettytable import PrettyTable
Define the table for your database
Base = declarative_base()
class Book(Base):
__tablename__ = "books"
id = Column(Integer, primary_key=True)
title = Column(String)
author = Column(String)
genre = Column(String)
Create a book repository class
class BookRepository:
def __init__(self):
self.semaphore = asyncio.Semaphore(50)
async def create_table(self):
self.engine = create_async_engine('sqlite+aiosqlite:///books.db', echo=False)
self.session = async_sessionmaker(self.engine, expire_on_commit=False)
async with self.engine.begin() as engine:
await engine.run_sync(Base.metadata.create_all)
async def create_book(self, title: str, author: str, genre: str):
async with self.semaphore:
try:
book = Book(title=title, author=author, genre=genre)
async with self.session.begin() as session:
session.add(book)
return True
except:
async with self.session.begin() as session:
await session.rollback()
return False
async def get_books(self):
async with self.semaphore:
async with self.session.begin() as session:
query = select(Book)
result = await session.stream(query)
books = await result.scalars().all()
return books
async def get_book_by_id(self, id: int):
async with self.semaphore:
async with self.session.begin() as session:
query = select(Book).where(Book.id == id)
result = await session.stream(query)
book = await result.scalars().one_or_none()
return book
async def update_book(self, id: int, **fields):
async with self.semaphore:
try:
async with self.session.begin() as session:
query = update(Book).where(Book.id == id).values(fields)
await session.execute(query)
return True
except:
async with self.session.begin() as session:
await session.rollback()
return False
async def delete_book(self, book: Book):
async with self.semaphore:
async with self.session.begin() as session:
await session.delete(book)
return True
async def close(self):
await self.engine.dispose()
Create a book repository instance
book_repository = BookRepository()
Create the table for your database
await book_repository.create_table()
Create books
await book_repository.create_book("The Hobbit", "J,R,R Tolkien", "Fantasy")
await book_repository.create_book("1987", "George Orwell", "Dystopian Fiction")
await book_repository.create_book("To Kill a Mockingbird", "Harper Lee", "Southern Gothic, Bildungsroman")
Get all books
table = PrettyTable(["id", "title", "author", "genre"])
books = await book_repository.get_books()
for book in books:
table.add_row([book.id, book.title, book.author, book.genre])
print(table)
table.clear_rows()
Get book by id
book = await book_repository.get_book_by_id(1)
table.add_row([book.id, book.title, book.author, book.genre])
print(table)
table.clear_rows()
Update a book
fields = {
"title": "I am title",
"author": "I am author",
"genre": "I am genre"
}
await book_repository.update_book(book.id, **fields)
books = await book_repository.get_books()
for book in books:
table.add_row([book.id, book.title, book.author, book.genre])
print(table)
table.clear_rows()
Delete a book
book = await book_repository.get_book_by_id(1)
await book_repository.delete_book(book)
books = await book_repository.get_books()
for book in books:
table.add_row([book.id, book.title, book.author, book.genre])
print(table)
Close the database connection
await book_repository.close()
Full code
Create a file and name it main.py and copy & paste the code below
import asyncio
from sqlalchemy import Column, Integer, String, select, update
from sqlalchemy.orm import declarative_base
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from prettytable import PrettyTable
Base = declarative_base()
class Book(Base):
__tablename__ = "books"
id = Column(Integer, primary_key=True)
title = Column(String)
author = Column(String)
genre = Column(String)
class BookRepository:
def __init__(self):
self.semaphore = asyncio.Semaphore(50)
async def create_table(self):
self.engine = create_async_engine('sqlite+aiosqlite:///books.db', echo=False)
self.session = async_sessionmaker(self.engine, expire_on_commit=False)
async with self.engine.begin() as engine:
await engine.run_sync(Base.metadata.create_all)
async def create_book(self, title: str, author: str, genre: str):
async with self.semaphore:
try:
book = Book(title=title, author=author, genre=genre)
async with self.session.begin() as session:
session.add(book)
return True
except:
async with self.session.begin() as session:
await session.rollback()
return False
async def get_books(self):
async with self.semaphore:
async with self.session.begin() as session:
query = select(Book)
result = await session.stream(query)
books = await result.scalars().all()
return books
async def get_book_by_id(self, id: int):
async with self.semaphore:
async with self.session.begin() as session:
query = select(Book).where(Book.id == id)
result = await session.stream(query)
book = await result.scalars().one_or_none()
return book
async def update_book(self, id: int, **fields):
async with self.semaphore:
try:
async with self.session.begin() as session:
query = update(Book).where(Book.id == id).values(fields)
await session.execute(query)
return True
except:
async with self.session.begin() as session:
await session.rollback()
return False
async def delete_book(self, book: Book):
async with self.semaphore:
async with self.session.begin() as session:
await session.delete(book)
return True
async def close(self):
await self.engine.dispose()
async def main():
book_repository = BookRepository() # create repository instance
await book_repository.create_table() # create the table
# Insert book
await book_repository.create_book("The Hobbit", "J,R,R Tolkien", "Fantasy")
await book_repository.create_book("1987", "George Orwell", "Dystopian Fiction")
await book_repository.create_book("To Kill a Mockingbird", "Harper Lee", "Southern Gothic, Bildungsroman")
# Get all books
table = PrettyTable(["id", "title", "author", "genre"])
books = await book_repository.get_books()
for book in books:
table.add_row([book.id, book.title, book.author, book.genre])
print(table)
table.clear_rows()
# Get book by id
book = await book_repository.get_book_by_id(1)
table.add_row([book.id, book.title, book.author, book.genre])
print(table)
table.clear_rows()
# Update a book
fields = {
"title": "I am title",
"author": "I am author",
"genre": "I am genre"
}
await book_repository.update_book(book.id, **fields)
books = await book_repository.get_books()
for book in books:
table.add_row([book.id, book.title, book.author, book.genre])
print(table)
table.clear_rows()
# Delete a book
book = await book_repository.get_book_by_id(1)
await book_repository.delete_book(book)
books = await book_repository.get_books()
for book in books:
table.add_row([book.id, book.title, book.author, book.genre])
print(table)
await book_repository.close()
if __name__ == "__main__":
asyncio.run(main())
Run the python script
python3 main.py