Skip to content

Observables

1. What Are Observables?

Observables are a programming concept used to implement the Observer Pattern, where a function (or object) reacts to events dynamically. This means that:

  • An event is emitted when something significant happens (e.g., a user registers, a payment is processed, etc.).
  • Subscribers (listeners) react to that event without direct coupling to the emitter.
  • This allows for a highly flexible, decoupled, and scalable event-driven architecture.

Observables can be thought of as a stream of events, much like how promises handle asynchronous data but continuously over time. Instead of handling a single future result, observables enable reactive programming, where parts of the system automatically respond to changes.

2. Benefits of Using Observables

Using observables in an application, especially within Lilya, comes with several advantages:

Decoupling & Maintainability

Observables help separate event producers (emitters) from event consumers (listeners). This reduces dependencies and makes the system easier to maintain.

Scalability & Extensibility

By using observables, new features can be added without modifying existing code. Developers can subscribe to events dynamically instead of changing core logic.

Concurrency & Efficiency

Using async-based event dispatching, Lilya handles multiple listeners without blocking execution. This improves performance in real-time applications.

Code Reusability

Once an observable event is defined, it can be reused across multiple parts of the application, reducing redundant logic.

Better Error Handling

Observables allow for centralized error handling on emitted events, making debugging more manageable.

3. Why Should You Use Observables?

Observables are useful in scenarios where something happens and multiple parts of the system need to react independently.

For example:

  • User Registration Events → Send a welcome email, log activity, and assign default roles.
  • Payment Processing → Notify the user, update the database, and trigger order fulfillment.
  • Live Data Streaming → Real-time notifications, stock updates, or WebSocket messages.
  • Background Tasks → Perform long-running operations (e.g., data processing, cleanup).
  • Logging & Monitoring → Collect application metrics without affecting request performance.

In Lilya, observables allow for an efficient and scalable event-driven approach, making it ideal for high-performance applications.


4. How Observables Are Applied in Lilya

Lilya provides built-in support for observables through the @observable decorator and EventDispatcher.

🔹 Key Components

  1. @observable Decorator
  2. Defines functions that can emit and/or listen for events.
  3. EventDispatcher
  4. Manages event subscriptions and emissions asynchronously.
  5. Async and Sync Support
  6. Supports both asynchronous and synchronous event handlers.
  7. Concurrency Handling
  8. Uses anyio.create_task_group() to handle multiple listeners in parallel.

5. Real-World Examples Using Lilya

Example 1: User Registration with Multiple Side Effects

A user registers, and multiple actions occur without coupling the logic together.

from lilya.apps import Lilya
from lilya.decorators import observable

app = Lilya()

# User registration endpoint
@app.post("/register")
@observable(send=["user_registered"])
async def register_user(data: dict):
    return {"message": "User registered successfully!"}


# Listeners for the event
@observable(listen=["user_registered"])
async def send_welcome_email():
    print("Sending welcome email...")


@observable(listen=["user_registered"])
async def assign_default_roles():
    print("Assigning default roles to the user...")
What Happens Here? ✅ A user registers → Event "user_registered" is emitted. ✅ The system automatically triggers listenersEmail is sent, roles are assigned. ✅ No direct dependency → Can easily add more listeners in the future.


Example 2: Payment Processing

When a payment is made, multiple systems react to the event.

from lilya.apps import Lilya
from lilya.decorators import observable

app = Lilya()


@app.post("/pay")
@observable(send=["payment_success"])
async def process_payment():
    return {"message": "Payment processed!"}


@observable(listen=["payment_success"])
async def notify_user():
    print("Notifying user about payment confirmation...")


@observable(listen=["payment_success"])
async def update_database():
    print("Updating payment database records...")


@observable(listen=["payment_success"])
async def generate_invoice():
    print("Generating invoice for the payment...")

One event triggers multiple independent processes.Fully decoupled logic for better maintainability.


Example 3: Logging User Activity

from lilya.apps import Lilya
from lilya.decorators import observable

app = Lilya()


@app.post("/login")
@observable(send=["user_logged_in"])
async def login():
    return {"message": "User logged in!"}


@observable(listen=["user_logged_in"])
async def log_login_activity():
    print("Logging user login activity...")

Logs login activity without modifying authentication logic.


Example 4: Real-Time Notifications

from lilya.apps import Lilya
from lilya.decorators import observable

app = Lilya()


@app.post("/comment")
@observable(send=["new_comment"])
async def add_comment():
    return {"message": "Comment added!"}


@observable(listen=["new_comment"])
async def send_notification():
    print("Sending notification about the new comment...")

Users get notified immediately after a comment is posted.


Example 5: Background Data Processing

from lilya.apps import Lilya
from lilya.decorators import observable

app = Lilya()

@app.post("/upload")
@observable(send=["file_uploaded"])
async def upload_file():
    return {"message": "File uploaded successfully!"}


@observable(listen=["file_uploaded"])
async def process_file():
    print("Processing file in the background...")

Heavy file processing runs asynchronously, without blocking the request.


Example 6: Scheduled Tasks & Cleanup Jobs

from lilya.decorators import observable

@observable(send=["daily_cleanup"])
async def trigger_cleanup():
    print("Daily cleanup event triggered!")


@observable(listen=["daily_cleanup"])
async def delete_old_records():
    print("Deleting old database records...")


@observable(listen=["daily_cleanup"])
async def clear_cache():
    print("Clearing application cache...")

Scheduled task runs automaticallyTriggers multiple cleanup tasks.


Conclusion

Observables in Lilya allow developers to build efficient, scalable, and maintainable event-driven applications. By leveraging @observable and EventDispatcher:

✔️ Events are decoupled from logic, improving maintainability. ✔️ Asynchronous execution improves performance. ✔️ Easily extend functionality without modifying existing code. ✔️ Ensures a clean, modular architecture.

Whether you're handling user events, background jobs, notifications, or real-time updates, observables empower you to build dynamic and reactive applications. 🚀