02. Repository Pattern

Prerequisites

The folder structure

[1]:
!tree -I '__pycache__|build|images|plantuml*|exports|examples'
.
├── 02-repository-pattern.ipynb
├── app
│   ├── domain
│   │   └── models.py
│   ├── __init__.py
│   └── services.py
├── requirements.txt
└── tests
    ├── __init__.py
    └── test_models.py

3 directories, 7 files

Install requirements

[2]:
!cat ./requirements.txt
sqlalchemy
pytest
git+https://github.com/AllanDaemon/mypy.git@0edf1233672117c4555759c5a91461a502ddce5d

Tests

[4]:
!pytest
============================= test session starts ==============================
platform linux -- Python 3.9.1, pytest-6.1.2, py-1.10.0, pluggy-0.13.1
rootdir: /home/ykkim/notebooks/2021-msa-study/02-repository-pattern
plugins: flask-1.1.0, anyio-2.0.2
collected 12 items

tests/test_models.py ............                                        [100%]

============================== 12 passed in 0.04s ==============================
[5]:
!mypy --strict -p app
app/services.py:3: error: Cannot find implementation or library stub for module named 'app.domain.models'
app/services.py:3: note: See https://mypy.readthedocs.io/en/latest/running_mypy.html#missing-imports
app/services.py:10: error: Returning Any from function declared to return "str"
Found 2 errors in 1 file (checked 2 source files)

Persisting Our Domain Model

Reminder: Our Model

[6]:
%%plantuml

@startuml

allow_mixing
left to right direction

class Order {
    * id: str
}

class OrderLine << valueobject >> {
    * orderid: str
    * sku: str
    * qty: int
}

class Batch {
    * reference: str
    sku: str
    eta: date
    --
    _purchased_quantity: int
    _allocations: [OrderLine]
}

actor Customer
actor "Purchasing\nDept. User"

Customer --> Order : places
Order::id o--> OrderLine::orderid : "comprises\nmultiple"
Batch::_allocations o--> OrderLine
"Purchasing\nDept. User" --> Batch : "purchases"

@enduml
[6]:
../_images/guide_02_12_0.svg
  • Product

    • identified by SKU(Stock Keeping Unit)

  • Order

    • identified by an order reference

    • comprises mutliple order lines

  • OrderLine

    • has a SKU and a quantity

  • Batch

    • has a unique ID(reference), a SKU, and a quantity

    • has an ETA if they are currently shipping

      • or they may be in warehouse stock.

  • When we allocate x units of stock to a batch, the available quantity is reduced by x.

The “Normal” ORM Way: Model Depends on ORM

[7]:
%%plantuml

@startuml

left to right direction

entity Order {
    * **id**: Integer
}

entity Batch {
    * **id**: Integer
    --
    * **reference**: String(255)
    * sku: String(255)
    * qty: Integer
}

entity Allocation {
    * **batch_id**
    * **orderline_id**
}

entity OrderLine {
    * **id**: Integer
    --
    * **order_id: Integer**
    * sku: String(255)
    * qty: Integer
    eta: Date
}


Order::id ||..|{ OrderLine::order_id
OrderLine::id ||..|{ Allocation::orderline_id
Batch::id ||..|{ Allocation::batch_id


@enduml
[7]:
../_images/guide_02_15_0.svg
[8]:
from sqlalchemy import Column, ForeignKey, Integer, String, Date, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship

Base = declarative_base()

class _Order(Base):
    __tablename__ = 'order'
    id = Column(Integer, primary_key=True, autoincrement=True)
    orderlines = relationship('_OrderLine', collection_class=set)

class _OrderLine(Base):
    __tablename__ = 'order_line'
    id = Column(Integer, primary_key=True, autoincrement=True)
    sku = Column(String(255))
    qty = Column(Integer)
    orderid = Column(Integer, ForeignKey('order.id'))
    order = relationship(_Order)

allocation = Table('allocation', Base.metadata,
    Column('orderline_id', Integer, ForeignKey('order_line.id'), primary_key=True),
    Column('batch_id', Integer, ForeignKey('batch.id'), primary_key=True)
)

class _Batch(Base):
    __tablename__ = 'batch'
    id = Column(Integer, primary_key=True, autoincrement=True)
    reference = Column(String(255), unique=True)
    sku = Column(String(255))
    eta = Column(Date)
    _purchased_quantity = Column(Integer)
    _allocations = relationship(_OrderLine, secondary=allocation, collection_class=set)
[9]:
def init_engine(url, metadata):
    import io, sys, re, logging
    from sqlalchemy import create_engine
    logger = logging.getLogger("sqlalchemy.engine.base.Engine")
    out = io.StringIO()
    logger.addHandler(logging.StreamHandler(out))

    engine = create_engine(url, echo=True) # temporary memory db
    metadata.create_all(engine)

    print(''.join(re.findall('CREATE TABLE.*?\n\n', out.getvalue(), re.DOTALL)))
    return engine

engine = init_engine('sqlite://', Base.metadata)
CREATE TABLE "order" (
        id INTEGER NOT NULL,
        PRIMARY KEY (id)
)

CREATE TABLE batch (
        id INTEGER NOT NULL,
        reference VARCHAR(255),
        sku VARCHAR(255),
        eta DATE,
        _purchased_quantity INTEGER,
        PRIMARY KEY (id),
        UNIQUE (reference)
)

CREATE TABLE order_line (
        id INTEGER NOT NULL,
        sku VARCHAR(255),
        qty INTEGER,
        orderid INTEGER,
        PRIMARY KEY (id),
        FOREIGN KEY(orderid) REFERENCES "order" (id)
)

CREATE TABLE allocation (
        orderline_id INTEGER NOT NULL,
        batch_id INTEGER NOT NULL,
        PRIMARY KEY (orderline_id, batch_id),
        FOREIGN KEY(orderline_id) REFERENCES order_line (id),
        FOREIGN KEY(batch_id) REFERENCES batch (id)
)


Inverting the Dependency: ORM Depends on Model

Using SQLAlchemy’s “classical mapping”.

[10]:
from sqlalchemy import MetaData, Table, Column, ForeignKey, Integer, String, Date
from sqlalchemy.orm import mapper, relationship
from app.domain.models import Batch, OrderLine

metadata = MetaData()

order_line = Table(
    'order_line', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('sku', String(255)),
    Column('qty', Integer, nullable=False),
    Column('orderid', String(255)),
    extend_existing=True
)

allocation = Table('allocation', metadata,
    Column('orderline_id', Integer, ForeignKey('order_line.id'), primary_key=True),
    Column('batch_id', Integer, ForeignKey('batch.id'), primary_key=True),
    extend_existing=True
)

batch = Table(
    'batch', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('reference', String(255), unique=True),
    Column('_purchased_quantity', Integer),
    Column('sku', String(255)),
    Column('eta', Date, nullable=True),
    extend_existing=True
)

batch_mapper = mapper(Batch, batch, properties={
    '_allocations': relationship(OrderLine, secondary=allocation,
                                 collection_class=set, lazy='joined'),
},)

order_line_mapper = mapper(OrderLine, order_line)
[11]:
from contextlib import contextmanager, AbstractContextManager

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session

engine = init_engine('sqlite://', metadata) # temporary memory db

SqliteSessionMaker = sessionmaker(engine, expire_on_commit=False)
SessionFactory = AbstractContextManager[Session]

@contextmanager
def session() -> SessionFactory:
    '''`with session` 블록을 이용한 자동 리소스 반환을 구현합니다.'''
    session = SqliteSessionMaker()
    try:
        yield session
    finally:
        session.close()
CREATE TABLE order_line (
        id INTEGER NOT NULL,
        sku VARCHAR(255),
        qty INTEGER NOT NULL,
        orderid VARCHAR(255),
        PRIMARY KEY (id)
)

CREATE TABLE batch (
        id INTEGER NOT NULL,
        reference VARCHAR(255),
        _purchased_quantity INTEGER,
        sku VARCHAR(255),
        eta DATE,
        PRIMARY KEY (id),
        UNIQUE (reference)
)

CREATE TABLE allocation (
        orderline_id INTEGER NOT NULL,
        batch_id INTEGER NOT NULL,
        PRIMARY KEY (orderline_id, batch_id),
        FOREIGN KEY(orderline_id) REFERENCES order_line (id),
        FOREIGN KEY(batch_id) REFERENCES batch (id)
)


[12]:
from app import mytest

mytest.init(__name__)

@mytest.unit
def test_orderline_mapper_can_load_lines(session):
    with session as db:
        db.execute('''
        INSERT INTO order_line (orderid, sku, qty) VALUES
        ('order1', 'RED-CHAIR', 12),
        ('order2', 'RED-TABLE', 13),
        ('order3', 'BLUE-LIPSTICK', 14)
        ''')
        expected = [
            OrderLine('order1', 'RED-CHAIR', 12),
            OrderLine('order2', 'RED-TABLE', 13),
            OrderLine('order3', 'BLUE-LIPSTICK', 14),
        ]
        assert expected == db.query(OrderLine).all()
test_orderline_mapper_can_load_lines
[13]:
@mytest.unit
def test_orderline_mapper_can_save_lines(session):
    with session as db:
        new_line = OrderLine('order1', 'DECORATIVE-WIDGET', 12)
        db.add(new_line)
        db.flush()

        rows = list(db.execute('SELECT orderid, sku, qty FROM order_line'))
        assert [('order1', 'DECORATIVE-WIDGET', 12)] == rows
test_orderline_mapper_can_save_lines

Introducing the Repository Pattern

  • An abstraction over persistent storage.

  • Hides details of data access by pretending that all of data is in memory.

[14]:
%%ditaa repository-pattern
+----------------------------------------+
|            Application Layer           |
+----------------------------------------+
               |^      /----------------\
               ||------|  Domain Model  |
               ||      |     Objects    |
               V|      \----------------/
+----------------------------------------+
|               Repository               |
+----------------------------------------+
                |
                V
+----------------------------------------+
|              Database Layer            |
+----------------------------------------+
[14]:
../_images/guide_02_24_0.svg

The Repository in the Abstract

[15]:
from typing import Protocol, Iterable, Set
from contextlib import ContextDecorator
import abc

class AbstractRepository(abc.ABC, ContextDecorator):

    def __enter__(self):
        return self

    def __exit__(self):
        self.close()
        return False

    @abc.abstractmethod
    def close(self, batch: Batch) -> None:
        pass

    @abc.abstractmethod
    def add(self, batch: Batch):
        raise NotImplementedError

    @abc.abstractmethod
    def get(self, reference) -> Batch:
        raise NotImplementedError

    @abc.abstractmethod
    def list(self) -> list[Batch]:
        raise NotImplementedError

    @abc.abstractmethod
    def clear(self) -> None:
        raise NotImplementedError

# Alternative approache: using `Protocol`
class RepositoryProtocol(Protocol):

    def add(self, batch: Batch) -> None:
        pass

    def get(self, reference) -> Batch:
        pass

    def list(self) -> list[Batch]:
        pass

    def clear(self) -> None:
        pass

What is the Trade-Off?

Introducing an extra layer of abstraction, - Will reduce complexity overall - But add complexity locally

Concrete repository : SqlAlchemyRepository

[16]:
from typing import Optional, Callable
from sqlalchemy.orm import Session

class SqlAlchemyRepository(AbstractRepository):

    def __init__(self, session: Session):
        self.session = session

    def close(self):
        self.session.close()

    def add(self, batch: Batch):
        self.session.add(batch)
        self.session.commit()

    def get(self, reference) -> Optional[Batch]:
        return self.session.query(Batch).filter_by(reference=reference).first()

    def list(self):
        return self.session.query(model.Batch).all()

    def clear(self):
        self.session.execute('DELETE FROM allocation')
        self.session.execute('DELETE FROM batch')
        self.session.execute('DELETE FROM order_line')
        self.session.commit()

Repository test for saving an object

[17]:
@mytest.unit
def test_repository_can_save_a_batch(session):
    with session as db:
        repo = SqlAlchemyRepository(db)
        try:
            batch = Batch("batch1", "RUSTY-SOAPDISH", 100, eta=None)
            repo.add(batch)
            rows = list(db.execute(
                'SELECT reference, sku, _purchased_quantity, eta FROM "batch"'
            ))
            assert rows == [("batch1", "RUSTY-SOAPDISH", 100, None)]
        finally:
            repo.clear()
test_repository_can_save_a_batch

Repository test for retrieving a complex object

[18]:
def insert_order_line(db, orderid="order1", sku="GENERIC-SOFA"):
    db.execute(
        'INSERT INTO order_line (orderid, sku, qty) '
        'VALUES (:orderid, :sku, 12)',
        dict(orderid="order1", sku="GENERIC-SOFA")
    )
    [orderline_id], *_ = db.execute(
        'SELECT id FROM order_line WHERE orderid=:orderid AND sku=:sku',
        dict(orderid="order1", sku="GENERIC-SOFA")
    )
    return orderline_id


def insert_batch(db, reference, sku="GENERIC-SOFA", qty=100):
    db.execute(
        'INSERT INTO batch (reference, sku, _purchased_quantity) '
        'VALUES (:reference, :sku, :qty)',
        dict(reference=reference, sku=sku, qty=qty)
    )
    [batch_id], *_ = db.execute(
        'SELECT id FROM batch WHERE reference=:reference',
        dict(reference=reference)
    )
    return batch_id


def insert_allocation(db, orderline_id, batch_id):
    db.execute(
        'INSERT INTO allocation (orderline_id, batch_id) '
        'VALUES (:orderline_id, :batch_id)',
        dict(orderline_id=orderline_id, batch_id=batch_id)
    )

@mytest.unit
def test_repository_can_retrieve_a_batch_with_allocations(session):
    with session as db:
        repo = SqlAlchemyRepository(db)
        try:
            orderline_id = insert_order_line(db)
            batch1_id = insert_batch(db, "batch1", qty=100)
            batch2_id = insert_batch(db, "batch2")
            insert_allocation(db, orderline_id, batch1_id)

            retrieved = repo.get("batch1")
            expected = Batch("batch1", sku="GENERIC-SOFA", qty=100, eta=None)

            assert retrieved == expected  # Batch.__eq__ only compares reference
            assert retrieved.sku == expected.sku
            assert retrieved._purchased_quantity == expected._purchased_quantity
            assert retrieved._allocations == {
                OrderLine("order1", "GENERIC-SOFA", 12),
            }
        finally:
            repo.clear()
test_repository_can_retrieve_a_batch_with_allocations

Building a Fake Repository for Tests is Now Trivial!

[19]:
class FakeRepository(AbstractRepository):

    def __init__(self, batches):
        self._batches = set(batches)

    def close(self):
        pass

    def add(self, batch):
        self._batches.add(batch)

    def get(self, reference):
        return next(b for b in self._batches if b.reference == reference)

    def list(self):
        return list(self._batches)

    def clear(self):
        self._batches = set()
[20]:
def fake_repo():
    batch1 = Batch("batch1", sku="TEST-TABLE", qty=100)
    batch2 = Batch("batch2", sku="TEST-CHAIR", qty=50)
    batch3 = Batch("batch3", sku="TEST-CABLE", qty=10)
    return FakeRepository([batch1, batch2, batch3])

def fake_request_params():
    return [
        dict(orderid="order1", sku="TEST-TABLE", qty=10),
        dict(orderid="order2", sku="TEST-TABLE", qty=20),
        dict(orderid="order3", sku="TEST-CHAIR", qty=30),
        dict(orderid="order4", sku="TEST-CABLE", qty=10),
    ]
[21]:
from app.services import allocate

@mytest.unit
def test_fake_repo(fake_repo, session, fake_request_params):
    with session as db:
        batches = fake_repo.list()
        lines = [
            OrderLine(l['orderid'], l['sku'], l['qty'])
            for l in fake_request_params
        ]

        for line in lines:
            allocate(line, batches)
        db.commit()
test_fake_repo

What is a Port and What is an Adapter, in Python?

Ports and adapters came out of the OO world - port : an interface between our application and whatever we wish to abstract away - adapter : an implementation behind that interface or abstraction.

Python doesn’t have interfaces, so defining a port can be simulated with abc.ABC or Protocol.

In this chapter, - port : AbstractRepository. - adapters : SqlAlchemyRepository and FakeRepository.

Wrap-Up

  • Summerize the costs and benefits of each architectural pattern we introduce.

  • Every single application DO NOT need to be built this way

    • Make sure the overall complexity of the repository pattern makes it worth in adding extra layers of indirection.

    • If your app is just a simple CRUD wrapper around a database, then YOU DON’T NEED a domain model or a repository.

Pros

Cons

  • We have a simple interface between persistent storage and our domain model.

  • It’s easy to make a fake version of the repository for unit testing, or to swap out different storage solutions, because we’ve fully decoupled the model from infrastructure concerns.

  • Writing the domain model before thinking about persistence helps us focus on the business problem at hand. If we ever want to radically change our approach, we can do that in our model, without needing to worry about foreign keys or migrations until later.

  • Our database schema is really simple because we have complete control over how we map our objects to tables.

  • An ORM already buys you some decoupling. Changing foreign keys might be hard, but it should be pretty easy to swap between MySQL and Postgres if you ever need to.

  • Maintaining ORM mappings by hand requires extra work and extra code.

  • Any extra layer of indirection always increases maintenance costs and adds a “WTF factor” for Python programmers who’ve never seen the Repository pattern before.

../images/apwp_0206.png

Our domain model should be free of infrastructure concerns, so your ORM should import your model, and not the other way around.

The repository gives you the illusion of a collection of in-memory objects. It makes it easy to create a FakeRepository for testing and to swap fundamental details of your infrastructure without disrupting your core application. See [appendix_csvs] for an example.