Pragmatic
EventSourcing

at PHPBenelux

Frank de Jonge
Freelance Developer
Flysystem

EventSauce.

A pragmatic event sourcing library for PHP with a focus on developer experience.

Learning EventSourcing
is pretty hard.

Learning EventSourcing
is pretty hard
and takes time.
EventSourcing is
built on a number
of techniques.
EventSourcing is
not hard, but it
is complex.
The individual parts
don't make a lot
of sense.
Talk is cheap, show me some code.

Event Aggregate Root Aggregate Root Repository Message Message Repository Message Dispatcher Consumer(s) Projection(s) Read Model(s) Process Manager(s)

                        $credit = 100;

                        $purchase = 80;

                        if ($credit >= $purchase) {
                            $credit -= $purchase;
                            echo "Purchase of $purchase is OK!";
                        } else {
                            echo "Purchase of $purchase has failed.";
                        }
                    

                        $credit = 60;

                        $purchase = 80;

                        if ($credit >= $purchase) {
                            $credit -= $purchase;
                            echo "Purchase of $purchase is OK!";
                        } else {
                            echo "Purchase of $purchase has failed.";
                        }
                    

                        $credit = fetch_credit();
                        echo "Credit is $credit\n";
                        $purchase = 80;

                        if ($credit >= $purchase) {
                            $credit -= $purchase;
                            echo "Purchase of $purchase is OK!";
                        } else {
                            echo "Purchase of $purchase has failed.";
                        }
                    

                        $credit = fetch_credit();
                        echo "Credit is $credit\n";
                        $purchase = amount_from_request();

                        if ($credit >= $purchase) {
                            $credit -= $purchase;
                            echo "Purchase of $purchase is OK!";
                        } else {
                            echo "Purchase of $purchase has failed.";
                        }
                    

                        $credit = fetch_credit();
                        echo "Credit is $credit\n";
                        $purchase = amount_from_request();

                        if ($credit >= $purchase) {
                            $credit -= $purchase;
                            echo "Purchase of $purchase is OK!";
                        } else {
                            echo "Purchase of $purchase has failed.";
                        }
                    

                        $credit = fetch_credit();
                        echo "Credit is $credit\n";
                        $purchase = amount_from_request();

                        if ($credit >= $purchase) {
                            $credit -= $purchase;
                            echo "Purchase of $purchase is OK!";
                        } else {
                            echo "Purchase of $purchase has failed.";
                        }
                    

                        $credit = fetch_credit();
                        echo "Credit is $credit\n";
                        $purchase = amount_from_request();

                        if ($credit >= $purchase) {
                            $credit -= $purchase;
                            echo "Purchase of $purchase is OK!";
                        } else {
                            echo "Purchase of $purchase has failed.";
                        }
                    

                        $credit = fetch_credit();
                        echo "Credit is $credit\n";
                        $purchase = amount_from_request();

                        if ($credit >= $purchase) {
                            $credit -= $purchase;
                            echo "Purchase of $purchase is OK!";
                        } else {
                            echo "Purchase of $purchase has failed.";
                        }
                    

In a given context,
when handling input
a decision is made
which then causes a result.


                        class PersonalBudget {
                            private $credit;
                            public function __construct(int $credit) {
                                $this->credit = $credit;
                            }
                            public function spend(int $amount): bool {
                                if ($amount > $this->credit) return false;

                                $this->credit -= $amount;

                                return true;
                            }
                        }
                    

                        $personalBudget = $repository->retrieve($id);

                        if ($personalBudget->spend(amount_to_spend())) {
                            echo "Spending succeeded";
                        } else {
                            echo "Spending failed";
                        }

                        $repository->persist($personalBudget);
                    

                        $personalBudget = PersonalBudget::find($id);

                        if ($personalBudget->spend(amount_to_spend())) {
                            echo "Spending succeeded";
                        } else {
                            echo "Spending failed";
                        }

                        $personalBudget->save();
                    

                        $personalBudget = $repository->retrieve($id);

                        if ($personalBudget->spend(amount_to_spend())) {
                            echo "Spending succeeded";
                        } else {
                            echo "Spending failed";
                        }

                        $repository->persist($personalBudget);
                    

                        $personalBudget = $repository->retrieve($id);

                        if ($personalBudget->spend(amount_to_spend())) {
                            echo "Spending succeeded";
                        } else {
                            echo "Spending failed";
                        }

                        $repository->persist($personalBudget);
                    

                        $personalBudget = $repository->retrieve($id);

                        if ($personalBudget->spend(amount_to_spend())) {
                            echo "Spending succeeded";
                        } else {
                            echo "Spending failed";
                        }

                        $repository->persist($personalBudget);
                    

                        $personalBudget = $repository->retrieve($id);

                        if ($personalBudget->spend(amount_to_spend())) {
                            echo "Spending succeeded";
                        } else {
                            echo "Spending failed";
                        }

                        $repository->persist($personalBudget);
                    

EventSourcing


                        $personalBudget = $repository->retrieve($id);

                        if ($personalBudget->spend(amount_to_spend())) {
                            echo "Spending succeeded";
                        } else {
                            echo "Spending failed";
                        }

                        $repository->persist($personalBudget);
                    

  1. Load the model from the database.
  2. Interact with the model.
  3. Persist updated model.

(Entity Modeling)

  1. Reconstitute the Aggregate from old events.
  2. Interact with the model.
  3. Persist/dispatch new events.

(Event Sourcing)

Event Aggregate (Root) Aggregate Root Repository Message Message Repository Message Dispatcher Consumer(s) Projection(s) Read Model(s) Process Manager(s)
Event Aggregate (Root) Aggregate Root Repository Message Message Repository Message Dispatcher Consumer(s) Projection(s) Read Model(s) Process Manager(s)
Event
a representation of a historical fact; something that happened in the past.
Event
a value-object; an object that represents a value


                    class PurchaseWasMade
                    {
                        private $amount;

                        public function __construct(int $amount) {
                            $this->amount = $amount;
                        }

                        public function amount(): int {
                            return $this->amount;
                        }
                    }
                    

                    class PurchaseWasMade
                    {
                        // ...

                        public function toPayload(): array {
                            return ['amount' => $this->amount];
                        }

                        public static function fromPayload(array $payload) {
                            return new static($payload['amount']);
                        }
                    }
                    

                        $event1 = new PurchaseWasMade(150);

                        $payload = $event1->toPayload();

                        $event2 = PurchaseWasMade::fromPayload($payload);

                        var_dump('same', $event1 === $event2);

                        var_dump('equal', $event1 == $event2);
                    
Aggregate Root
a whole formed by combining several separate elements
Aggregate Root
the entry point for our model


        class PersonalBudget
        {
            private $budget = 0;

            public function __construct(AggregateRootId $id)
            {
                $this->id = $id;
            }

        }
                    

        class PersonalBudget
        {
            // ...

            public function increaseBudget(int $amount)
            {
                $this->budget += $amount;
            }

        }
                    

        class PersonalBudget
        {
            // ...

            public function increaseBudget(int $amount)
            {
                $this->recordThat(new BudgetWasIncreased($amount));
            }

        }
                    

class PersonalBudget
{
    // ...

    protected function recordThat(object $event)
    {
        $this->apply($event);
        $this->recordedEvents[] = $event;
    }
}
                    

class PersonalBudget
{
    // ...

    protected function apply(object $event)
    {
        $parts = explode('\\', get_class($event));
        $this->{'apply' . end($parts)}($event);
    }
}
                    

class PersonalBudget
{
    // ...

    protected function applyBudgetWasIncreased(
        BudgetWasIncreased $event
    ) {
        $this->budget += $event->amount();
    }
}
                    

class PersonalBudget
{
    public function spend(int $amount)
    {
        if ($this->budget >= $amount) {
            $this->budget -= $amount;

            return true;
        } else {


            return false;
        }
    }
}
                    

class PersonalBudget
{
    public function spend(int $amount)
    {
        if ($this->budget >= $amount) {
            $this->recordThat(new PurchaseWasMade($amount));

            return true;
        } else {


            return false;
        }
    }
}
                    

class PersonalBudget
{
    public function spend(int $amount)
    {
        if ($this->budget >= $amount) {
            $this->recordThat(new PurchaseWasMade($amount));

            return true;
        } else {
            $this->recordThat(new BudgetWasInsufficient($amount));

            return false;
        }
    }
}
                    

class PersonalBudget
{
    protected function applyPurchaseWasMade(
        PurchaseWasMade $event
    ) {
        $this->budget -= $event->amount();
    }





    // ...

}
                    

                $personalBudget->increaseBudget(50);
                $personalBudget->increaseBudget(100);
                $personalBudget->spend(75);

                // Can I spend 10 now?
                var_dump($personalBudget->spend(10));
                    

                $personalBudget->increaseBudget(50);
                $personalBudget->increaseBudget(100);
                $personalBudget->spend(75);

                // Can I spend 100 now?
                var_dump($personalBudget->spend(100));
                    

                new BudgetWasIncreased(50);
                new BudgetWasIncreased(100);
                new PurchaseWasMade(75);

                // Can I spend 100 now?
                new BudgetWasInsufficient(100);
                    

class PersonalBudget
{
    protected function recordThat(object $event)
    {
        $this->apply($event);
        $this->recordedEvents[] = $event;
    }








}
                    

class PersonalBudget
{
    protected function recordThat(object $event)
    {
        $this->apply($event);
        $this->recordedEvents[] = $event;
    }

    public function releaseEvents(): array
    {
        $releasedEvents = $this->recordedEvents;
        $this->recordedEvents = [];

        return $releasedEvents;
    }
}
                    
Aggregate Root Repository
is responsible for storing recorded events and reconstituting aggregates

interface AggregateRootRepository
{
    public function persist(object $aggregateRoot);

    public function retrieve(AggregateRootId id): object;
}
                    

                        class AggregateRootRepository
                        {
                            public function persist(AggregateRoot $aggregate)
                            {
                                $events = $aggregate->releaseEvents();
                                $aggregateRootId = $aggregate->aggregateRootId();
                                $headers = [Header::AGGREGATE_ROOT_ID => $aggregateRootId];
                                $messages = array_map(function (object $event) use ($headers) {
                                    return new Message($event, $headers);
                                }, $events);

                                $this->messages->persist(...$messages);
                                $this->dispatcher->dispatch(...$messages);
                            }
                        }
                    

                        class AggregateRootRepository
                        {
                            public function persist(AggregateRoot $aggregate)
                            {
                                $events = $aggregate->releaseEvents();
                                $aggregateRootId = $aggregate->aggregateRootId();
                                $headers = [Header::AGGREGATE_ROOT_ID => $aggregateRootId];
                                $messages = array_map(function (object $event) use ($headers) {
                                    return new Message($event, $headers);
                                }, $events);

                                $this->messages->persist(...$messages);
                                $this->dispatcher->dispatch(...$messages);
                            }
                        }
                    

                        class AggregateRootRepository
                        {
                            public function persist(AggregateRoot $aggregate)
                            {
                                $events = $aggregate->releaseEvents();
                                $aggregateRootId = $aggregate->aggregateRootId();
                                $headers = [Header::AGGREGATE_ROOT_ID => $aggregateRootId];
                                $messages = array_map(function (object $event) use ($headers) {
                                    return new Message($event, $headers);
                                }, $events);

                                $this->messages->persist(...$messages);
                                $this->dispatcher->dispatch(...$messages);
                            }
                        }
                    

                        class AggregateRootRepository
                        {
                            public function persist(AggregateRoot $aggregate)
                            {
                                $events = $aggregate->releaseEvents();
                                $aggregateRootId = $aggregate->aggregateRootId();
                                $headers = [Header::AGGREGATE_ROOT_ID => $aggregateRootId];
                                $messages = array_map(function (object $event) use ($headers) {
                                    return new Message($event, $headers);
                                }, $events);

                                $this->messages->persist(...$messages);
                                $this->dispatcher->dispatch(...$messages);
                            }
                        }
                    
Event Aggregate (Root) Aggregate Root Repository Message Message Repository Message Dispatcher Consumer(s) Projection(s) Read Model(s) Process Manager(s)
Event Aggregate (Root) Aggregate Root Repository Message Message Repository Message Dispatcher Consumer(s) Projection(s) Read Model(s) Process Manager(s)
Message
a wrapper for an event containing additional metadata (aggregate id, event type, etc)

class Message
{
    /** @var object */
    private $event;

    /** @var array */
    private $headers = [];

    public function __construct(object $event, array $headers = [])
    {
        $this->event = $event;
        $this->headers = $headers;
    }

    // ...
}
                    

interface MessageRepository
{
    public function persist(Message ...$messages);

    public function retrieveAll(AggregateRootId $id): Generator;
}
                    

CREATE TABLE domain_messages (
    event_id VARCHAR(36) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    aggregate_root_id VARCHAR(36) NULL,
    time_of_recording DATETIME(6) NOT NULL,
    payload JSON NOT NULL,
    INDEX aggregate_root_id (aggregate_root_id),
    INDEX time_of_recording (time_of_recording)
)
                    

                    {
                        "header" {
                            "aggregate_root_id": "4d975448-d12b-4a2e-bea4-13efd557bd6e",
                            "event_type": "some_company.namespace.purchase_was_made"
                        },
                        "payload": {
                            "amount": 80
                        }
                    }
                    

                    {
                        "header" {
                            "aggregate_root_id": "4d975448-d12b-4a2e-bea4-13efd557bd6e",
                            "event_type": "some_company.namespace.purchase_was_made"
                        },
                        "payload": {
                            "amount": 80
                        }
                    }
                    

                    {
                        "header" {
                            "aggregate_root_id": "4d975448-d12b-4a2e-bea4-13efd557bd6e",
                            "event_type": "some_company.namespace.purchase_was_made"
                        },
                        "payload": {
                            "amount": 80
                        }
                    }
                    

                    {
                        "header" {
                            "aggregate_root_id": "4d975448-d12b-4a2e-bea4-13efd557bd6e",
                            "event_type": "some_company.namespace.purchase_was_made"
                        },
                        "payload": {
                            "amount": 80
                        }
                    }
                    

                        interface MessageDispatcher
                        {
                            public function dispatch(Message ...$messages);
                        }
                    

class SynchronousMessageDispatcher implements MessageDispatcher
{
    private $consumers;

    public function __construct(Consumer ...$consumers)
    {
        $this->consumers = $consumers;
    }

    public function dispatch(Message ...$messages)
    {
        foreach ($messages as $message) {
            foreach ($this->consumers as $consumer) {
                $consumer->handle($message);
            }
        }
    }
}
                    

Synchronous

✅ No extra infrastructure
✅ Good enough (?)
❌ No retry mechanisms
❌ No horizontal scaling
❌ No easy rebuilds

Asynchronous

❌ Extra infrastructure
❌ More complex setup
✅ Retry mechanisms
✅ Horizontal scaling
✅ Rebuilds possible
Event Aggregate (Root) Aggregate Root Repository Message Message Repository Message Dispatcher Consumer(s) Projection(s) Read Model(s) Process Manager(s)
Event Aggregate (Root) Aggregate Root Repository Message Message Repository Message Dispatcher Consumer(s) Projection(s) Read Model(s) Process Manager(s)
Consumer
something that reacts to events


                        interface Consumer
                        {
                            public function handle(Message $message);
                        }
                    
Projection
something that uses event data to create a read-model (view)
Read Model
a view of the event data optimized for reading


                class MostSpentList implements Consumer
                {
                    public function handle(Message $message)
                    {
                        $event = $message->event();

                        if ($event instanceof PurchaseWasMade) {
                            $this->spentPerUser->increase(
                                $message->aggregateRootId(),
                                $event->amount()
                            );
                        }
                    }






                }
                    

                class HighestPurchasePerMonth implements Consumer
                {
                    public function handle(Message $message)
                    {
                        $event = $message->event();

                        if ($event instanceof PurchaseWasMade) {
                            $period = substr(
                                $message->header(Header::TIME_OF_RECORDING), 0, 7
                            ); // 2019-01
                            $currentHighest = $this->purchases->highest($period);

                            if ($currentHighest < $event->amount()) {
                                $this->purchases->updateHighest(
                                    $period, $event->amount()
                                );
                            }
                        }
                    }
                }
                    
❌ No automatic view
❌ Separate setup required
✅ Views are independent
✅ Use far simpler views
✅ Use any type of DB

                        $personalBudget = new PersonalBudget($id);

                        if ($personalBudget->spend(amount_to_spend())) {
                            echo "Spending succeeded";
                        } else {
                            echo "Spending failed";
                        }

                        $repository->persist($personalBudget);
                    

                        $personalBudget = $repository->retrieve($id);

                        if ($personalBudget->spend(amount_to_spend())) {
                            echo "Spending succeeded";
                        } else {
                            echo "Spending failed";
                        }

                        $repository->persist($personalBudget);
                    
Reconstitution
the action of building something up again

  1. Retrieve relevant events
  2. Create an aggregate instance
  3. Apply all events to the model


class AggregateRootRepository
{
    public function __construct(
        string $aggregateRootClassName,
        MessageRepository $messageRepository
    ) {
        $this->aggregateRootClassName = $aggregateRootClassName;
        $this->messages = $messageRepository;
    }



}
                    

class AggregateRootRepository
{
    public function retrieve(AggregateRootId $id): object
    {
        $className = $this->aggregateRootClassName;

        return $className::reconstituteFromEvents(
            $id,
            $this->retrieveAllEvents($id)
        );
    }

}
                    

class AggregateRootRepository
{
    private function retrieveAllEvents(AggregateRootId $id): Generator
    {
        $messages = $this->messages->retrieveAll($id);

        /** @var Message $message */
        foreach ($messages as $message) {
            yield $message->event();
        }
    }

}
                    

                SELECT *
                    FROM `domain_messages`
                    WHERE `aggregate_id` = :aggregate_id
                    ORDER BY `time_of_recording` ASC
                    

class PersonalBudget
{
    public static function reconstituteFromEvents(
        Identifier $id,
        Generator $events
    ): PersonalBudget
    {
        $budget = new static($id);
        foreach ($events as $event) $budget->apply($event);

        return $budget;
    }
}
                    

class PersonalBudget
{
    public static function reconstituteFromEvents(
        Identifier $id,
        Generator $events
    ): PersonalBudget
    {
        $budget = new static($id);
        foreach ($events as $event) $budget->apply($event);

        return $budget;
    }
}
                    
A reconstituted aggregate is ready to make a decision.

                        $personalBudget = $repository->retrieve($id);

                        if ($personalBudget->spend(amount_to_spend())) {
                            echo "Spending succeeded";
                        } else {
                            echo "Spending failed";
                        }

                        $repository->persist($personalBudget);
                    

How about tests?


                        $messageRepository = new InMemoryMessageRepository();
                        $aggregateRootRepository = new AggregateRootRepository(
                            PersonalBudget::class,
                            $messageRepository
                        );

                        $id = AggregateRootId::create();
                        $headers = [Header::AGGREGATE_ROOT_ID => $id];
                        $messageRepository->persist(
                            new Message(new BudgetWasIncreased(100), $headers)
                        );

                        $budget = $aggregateRootRepository->retrieve($id);

                        $budget->spend(150);

                        $aggregateRootRepository->persist($budget);
                        var_dump($messageRepository->lastCommit());
                    

                        $messageRepository = new InMemoryMessageRepository();
                        $aggregateRootRepository = new AggregateRootRepository(
                            PersonalBudget::class,
                            $messageRepository
                        );

                        $id = AggregateRootId::create();
                        $headers = [Header::AGGREGATE_ROOT_ID => $id];
                        $messageRepository->persist(
                            new Message(new BudgetWasIncreased(100), $headers)
                        );

                        $budget = $aggregateRootRepository->retrieve($id);

                        $budget->spend(150);

                        $aggregateRootRepository->persist($budget);
                        var_dump($messageRepository->lastCommit());
                    

                        $messageRepository = new InMemoryMessageRepository();
                        $aggregateRootRepository = new AggregateRootRepository(
                            PersonalBudget::class,
                            $messageRepository
                        );

                        $id = AggregateRootId::create();
                        $headers = [Header::AGGREGATE_ROOT_ID => $id];
                        $messageRepository->persist(
                            new Message(new BudgetWasIncreased(100), $headers)
                        );

                        $budget = $aggregateRootRepository->retrieve($id);

                        $budget->spend(150);

                        $aggregateRootRepository->persist($budget);
                        var_dump($messageRepository->lastCommit());
                    

                        $messageRepository = new InMemoryMessageRepository();
                        $aggregateRootRepository = new AggregateRootRepository(
                            PersonalBudget::class,
                            $messageRepository
                        );

                        $id = AggregateRootId::create();
                        $headers = [Header::AGGREGATE_ROOT_ID => $id];
                        $messageRepository->persist(
                            new Message(new BudgetWasIncreased(100), $headers)
                        );

                        $budget = $aggregateRootRepository->retrieve($id);

                        $budget->spend(150);

                        $aggregateRootRepository->persist($budget);
                        var_dump($messageRepository->lastCommit());
                    

                        $messageRepository = new InMemoryMessageRepository();
                        $aggregateRootRepository = new AggregateRootRepository(
                            PersonalBudget::class,
                            $messageRepository
                        );

                        $id = AggregateRootId::create();
                        $headers = [Header::AGGREGATE_ROOT_ID => $id];
                        $messageRepository->persist(
                            new Message(new BudgetWasIncreased(100), $headers)
                        );

                        $budget = $aggregateRootRepository->retrieve($id);

                        $budget->spend(150);

                        $aggregateRootRepository->persist($budget);
                        var_dump($messageRepository->lastCommit());
                    

                        $messageRepository = new InMemoryMessageRepository();
                        $aggregateRootRepository = new AggregateRootRepository(
                            PersonalBudget::class,
                            $messageRepository
                        );

                        $id = AggregateRootId::create();
                        $headers = [Header::AGGREGATE_ROOT_ID => $id];
                        $messageRepository->persist(
                            new Message(new BudgetWasIncreased(100), $headers)
                        );

                        $budget = $aggregateRootRepository->retrieve($id);

                        $budget->spend(150);

                        $aggregateRootRepository->persist($budget);
                        var_dump($messageRepository->lastCommit());
                    

                        $messageRepository = new InMemoryMessageRepository();
                        $aggregateRootRepository = new AggregateRootRepository(
                            PersonalBudget::class,
                            $messageRepository
                        );

                        $id = AggregateRootId::create();
                        $headers = [Header::AGGREGATE_ROOT_ID => $id];
                        $messageRepository->persist(
                            new Message(new BudgetWasIncreased(100), $headers)
                        );

                        $budget = $aggregateRootRepository->retrieve($id);

                        $budget->spend(150);

                        $aggregateRootRepository->persist($budget);
                        var_dump($messageRepository->lastCommit());
                    
Given
Certain events happened
When
Handling a type of input
Then
We can assert the output

                        /**
                         * @test
                         */
                        public function it_allows_to_spend_within_the_budget()
                        {
                            $id = $this->aggregateRootId();

                            $this->given(
                                new BudgetWasIncreased(100)

                            )->when(
                                new MakePurchase($id, 40)
                            )->then(
                                new PurchaseWasMade(40)
                            );
                        }
                    

                        /**
                         * @test
                         */
                        public function it_disallows_to_spend_over_budget()
                        {
                            $id = $this->aggregateRootId();

                            $this->given(
                                new BudgetWasIncreased(100),
                                new PurchaseWasMade(80)
                            )->when(
                                new MakePurchase($id, 40)
                            )->then(
                                new BudgetWasInsufficient(40)
                            );
                        }
                    

                        /**
                         * @test
                         */
                        public function it_disallows_to_spend_over_budget()
                        {
                            $id = $this->aggregateRootId();

                            $this->given(
                                new BudgetWasIncreased(100),
                                new PurchaseWasMade(80)
                            )->when(
                                new MakePurchase($id, 40)
                            )->expectToFail(
                                new InsufficientFundsException()
                            );
                        }
                    

When to apply this:

  • Process modelling
  • Auditing requirements
  • Need to communicate change
  • Need different kinds of read models

With Event Sourcing

Simple tasks are more work.

Difficult tasks are easier.

EventSauce

✅ Event Sourcing
✅ Test Tooling
✅ Code Generation

I'm

@frankdejonge

Thank you for your time.