April 30, 2024 by CodeFlowerHorn

Step-by-Step Guide: SQLAlchemy Integration with FastAPI


After reading about APIs and databases in our last post, we now want to smoothly combine them to produce a reliable and effective solution. permitting data to be manipulated, stored, and retrieved as needed. When developing practical applications, this is a fantastic place to start.

Prerequisite

Ubuntu requirements
                                    sudo apt install uvicorn -y
python3 -m pip install --upgrade sqlalchemy fastapi[all]
                                
Windows requirements
                                    python3 -m pip install uvicorn
python3 -m pip install -upgrade sqlalchemy fastapi[all]
                                
Import libraries
                                    import asyncio
import uvicorn

from fastapi import FastAPI, status
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import Column, Integer, String, select, update 
from sqlalchemy.orm import declarative_base
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker  

from contextlib import asynccontextmanager
                                
Defined the table for your database
                                    Base = declarative_base()  
class Book(Base):
    __tablename__ = "books"

    id = Column(Integer, primary_key=True)
    title = Column(String)
    author = Column(String)
    genre = Column(String)
                                
Create a book repository class
                                    class BookRepository:
    def __init__(self):  
        self.semaphore = asyncio.Semaphore(50)  
        self.engine = create_async_engine('sqlite+aiosqlite:///books.db', echo=False)
        self.session = async_sessionmaker(self.engine, expire_on_commit=False)

    async def create_table(self): 
        async with self.engine.begin() as engine:  
            await engine.run_sync(Base.metadata.create_all)    
        
    async def create_book(self, title: str, author: str, genre: str):
        async with self.semaphore:
            try:
                book = Book(title=title, author=author, genre=genre)
                async with self.session.begin() as session:
                    session.add(book)
                return True
            except:
                async with self.session.begin() as session:
                    await session.rollback()
                return False
            
    async def get_books(self):
        async with self.semaphore:
            async with self.session.begin() as session:
                query = select(Book)
                result = await session.stream(query)
                books = await result.scalars().all()
            
            return books

    async def get_book_by_id(self, id: int):
        async with self.semaphore:
            async with self.session.begin() as session:
                query = select(Book).where(Book.id == id)
                result = await session.stream(query)
                book = await result.scalars().one_or_none()
            
            return book

    async def update_book(self, id: int, **fields): 
        async with self.semaphore:
            try:
                async with self.session.begin() as session:
                    query = update(Book).where(Book.id == id).values(fields)
                    await session.execute(query)
                return True
            except:
                async with self.session.begin() as session:
                    await session.rollback()
                return False

    async def delete_book(self, book: Book):
        async with self.semaphore:
            async with self.session.begin() as session:
                await session.delete(book)

            return True

    async def close(self):
        await self.engine.dispose()
                                
Defined your http endpoints
                                    class Api:
    def __init__(self, app: FastAPI) -> None:
        self.repo = BookRepository()
        self.app = app    

    def api(self):
        @self.app.get("/book/{id}", status_code=status.HTTP_200_OK, tags=["Book"])
        async def get_book(id: int):
            book = await self.repo.get_book_by_id(id)
            return book

        @self.app.get("/books", status_code=status.HTTP_200_OK, tags=["Book"])
        async def get_books():
            books = await self.repo.get_books()
            return books
            
        @self.app.post("/book", status_code=status.HTTP_201_CREATED, tags=["Book"])
        async def create_book(title: str, author: str, genre: str):
            ret = await self.repo.create_book(title=title, author=author, genre=genre)
            return ret

        @self.app.put("/book/{id}", status_code=status.HTTP_200_OK, tags=["Book"])
        async def update_book(id: int, title: str = None, author: str = None, genre: str = None):
            fields = {
                "title": title,
                "author": author,
                "genre": genre
            }
            ret = await self.repo.update_book(id, **fields)
            return ret

        @self.app.delete("/book/{id}", status_code=status.HTTP_204_NO_CONTENT, tags=["Book"])
        async def delete_book(id: int):
            book = await self.repo.get_book_by_id(id)
            await self.repo.delete_book(book)
                                
Start up
This code will be executed before the application start up
                                    @asynccontextmanager
async def lifespan(app: FastAPI):
    await BookRepository().create_table()
    yield
    await BookRepository().close()
                                
Variables
                                    app = FastAPI(

title="Books API",
    description="A simple api for CRUD operations",
    version="1.0",
    swagger_ui_parameters={ "defaultModelsExpandDepth": -1 },
    lifespan=lifespan
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

Api(app).api()
                                
Entrypoint
                                    if __name__ == "__main__": 
    uvicorn.run(app, host="0.0.0.0", port=8000)
                                
Full code
Create a file and name it main.py and copy & paste the code below
                                    import asyncio
import uvicorn

from fastapi import FastAPI, status
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import Column, Integer, String, select, update 
from sqlalchemy.orm import declarative_base
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker  

from contextlib import asynccontextmanager 

Base = declarative_base()  
class Book(Base):
    __tablename__ = "books"

    id = Column(Integer, primary_key=True)
    title = Column(String)
    author = Column(String)
    genre = Column(String)  

class BookRepository:
    def __init__(self):  
        self.semaphore = asyncio.Semaphore(50)  
        self.engine = create_async_engine('sqlite+aiosqlite:///books.db', echo=False)
        self.session = async_sessionmaker(self.engine, expire_on_commit=False)

    async def create_table(self): 
        async with self.engine.begin() as engine:  
            await engine.run_sync(Base.metadata.create_all)    
        
    async def create_book(self, title: str, author: str, genre: str):
        async with self.semaphore:
            try:
                book = Book(title=title, author=author, genre=genre)
                async with self.session.begin() as session:
                    session.add(book)
                return True
            except:
                async with self.session.begin() as session:
                    await session.rollback()
                return False
            
    async def get_books(self):
        async with self.semaphore:
            async with self.session.begin() as session:
                query = select(Book)
                result = await session.stream(query)
                books = await result.scalars().all()
            
            return books

    async def get_book_by_id(self, id: int):
        async with self.semaphore:
            async with self.session.begin() as session:
                query = select(Book).where(Book.id == id)
                result = await session.stream(query)
                book = await result.scalars().one_or_none()
            
            return book
    
    async def update_book(self, id: int, **fields): 
        async with self.semaphore:
            try:
                async with self.session.begin() as session:
                    query = update(Book).where(Book.id == id).values(fields)
                    await session.execute(query)
                return True
            except:
                async with self.session.begin() as session:
                    await session.rollback()
                return False

    async def delete_book(self, book: Book):
        async with self.semaphore:
            async with self.session.begin() as session:
                await session.delete(book)

            return True
    
    async def close(self):
        await self.engine.dispose() 

class Api:
    def __init__(self, app: FastAPI) -> None:
        self.repo = BookRepository()
        self.app = app    

    def api(self):
        @self.app.get("/book/{id}", status_code=status.HTTP_200_OK, tags=["Book"])
        async def get_book(id: int):
            book = await self.repo.get_book_by_id(id)
            return book

        @self.app.get("/books", status_code=status.HTTP_200_OK, tags=["Book"])
        async def get_books():
            books = await self.repo.get_books()
            return books
            
        @self.app.post("/book", status_code=status.HTTP_201_CREATED, tags=["Book"])
        async def create_book(title: str, author: str, genre: str):
            ret = await self.repo.create_book(title=title, author=author, genre=genre)
            return ret

        @self.app.put("/book/{id}", status_code=status.HTTP_200_OK, tags=["Book"])
        async def update_book(id: int, title: str = None, author: str = None, genre: str = None):
            fields = {
                "title": title,
                "author": author,
                "genre": genre
            }
            ret = await self.repo.update_book(id, **fields)
            return ret

        @self.app.delete("/book/{id}", status_code=status.HTTP_204_NO_CONTENT, tags=["Book"])
        async def delete_book(id: int):
            book = await self.repo.get_book_by_id(id)
            await self.repo.delete_book(book)

@asynccontextmanager
async def lifespan(app: FastAPI):
    await BookRepository().create_table()
    yield
    await BookRepository().close()

app = FastAPI(
    title="Books API",
    description="A simple api for CRUD operations",
    version="1.0",
    swagger_ui_parameters={ "defaultModelsExpandDepth": -1 },
    lifespan=lifespan
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

Api(app).api()
    
if __name__ == "__main__": 
    uvicorn.run(app, host="0.0.0.0", port=8000)
                                
Run the python script for Ubuntu
                                    uvicorn main:app --host 0.0.0.0 --port 8000 --reload
                                
Run the python script for Windows
                                    python3 -m uvicorn main:app --host 0.0.0.0 --port 8000 --reload