I recently came across Python’s atexit module and became curious about practical use cases in real-world applications. To explore this, I created a simple client-server app: the client publishes data periodically, and the server consumes and stores it.

A bit about atexit before we get into the app:

The atexit module defines functions to register and unregister cleanup functions. Functions thus registered are automatically executed upon normal interpreter termination.

In short, you can register functions with atexit to perform cleanup tasks when your Python application exits normally.

However, there’s a caveat: these functions won’t run if a fatal internal error occurs or if you use os._exit(). In such cases, the signal module can help — it allows you to handle termination signals like SIGINT or SIGTERM and run custom cleanup code before the process exits.

Code:

Server:

The server listens on 0.0.0.0:12345 and uses MyUDPHandler to queue incoming data into ingest_queue. A separate daemon thread continuously processes this queue in the background. Because daemon threads don’t block the program from exiting, it’s important to use atexit to handle cleanup. The shutdown_handler function, registered with atexit, ensures that any remaining data still in the queue is properly flushed to the database when the server shuts down gracefully.

Note: This example uses a shared in-memory SQLite database for testing purposes. In a real app, you’d want to use a persistent on-disk database or a shared DB connection pool.

import atexit
import queue
import time
from typing import TypedDict
import threading
import json
import socketserver
from typing import override
import logging
import sqlite3

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(thread)d | %(message)s",
)
logger = logging.getLogger(__name__)


SENSOR_MEASUREMENT_TABLE = "sensor_measurements"


def get_connection():
    # Using an in-memory database for testing purposes
    return sqlite3.connect(
        "file:memdb1?mode=memory&cache=shared",
        uri=True,
        timeout=20,
        check_same_thread=False,
    )


def initialize_db():
    with get_connection() as conn:
        conn.execute(
            f"""
            CREATE TABLE IF NOT EXISTS {SENSOR_MEASUREMENT_TABLE} (
                sensor_id integer,
                temperature decimal,
                humidity decimal,
                ingested_at timestamp);
            """
        )


class SensorMetrics(TypedDict):
    sensor_id: int
    temperature: float
    humidity: float
    ingested_at: float


ingest_queue: queue.Queue[SensorMetrics] = queue.Queue()


def _persist_data(metrics: list[SensorMetrics], conn: sqlite3.Connection):
    buffer = [(*metric.values(),) for metric in metrics]
    try:
        conn.executemany(
            f"""
            INSERT INTO {SENSOR_MEASUREMENT_TABLE} 
            (sensor_id, temperature, humidity, ingested_at)
            values (?,?,?,?)""",
            buffer,
        )
        conn.commit()
    except Exception:
        logger.exception("Error occurred while persisting data")


class MyUDPHandler(socketserver.DatagramRequestHandler):
    @override
    def handle(self):
        logger.info("Processing request from %s", self.client_address)
        data: SensorMetrics = json.loads(self.request[0].strip().decode("utf-8"))
        ingest_queue.put(data)


def _process_queued_data():
    with get_connection() as conn:
        while True:
            # Emulating a delay to simulate real-time processing
            time.sleep(0.1)
            _persist_data([ingest_queue.get()], conn)


def shutdown_handler():
    logger.info("Shutting down server, persisting remaining data...")
    buffer = []
    while not ingest_queue.empty():
        buffer.append(ingest_queue.get())

    if buffer:
        with get_connection() as conn:
            _persist_data(buffer, conn)

            cursor = conn.execute("SELECT COUNT(*) FROM sensor_measurements")
            logger.info(
                "Total records persisted in shutdown_handler: %s", cursor.fetchone()[0]
            )


atexit.register(shutdown_handler)


def main() -> None:
    initialize_db()
    threading.Thread(target=_process_queued_data, daemon=True).start()
    with socketserver.ThreadingUDPServer(("0.0.0.0", 12345), MyUDPHandler) as server:
        logger.info("Server started at localhost:12345")
        server.serve_forever()


if __name__ == "__main__":
    main()

Client:

import socket
import logging
import time
import random
import json

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(thread)d | %(message)s",
)
logger = logging.getLogger(__name__)


def main() -> None:
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    addr = ("localhost", 12345)
    logger.info("Connected to server: %s", addr)

    try:
        while True:
            data = {
                "sensor_id": random.randint(10, 1000),
                "temperature": random.uniform(1, 35),
                "humidity": random.uniform(1, 12),
                "ingested_at": time.time(),
            }
            client_socket.sendto(json.dumps(data).encode(), addr)
            time.sleep(0.01)
    except KeyboardInterrupt:
        print("\nClient interrupted and shutting down.")
    finally:
        client_socket.close()


if __name__ == "__main__":
    main()

Final Thoughts:

This small project helped me understand where atexit can fit in a typical app: cleaning up or persisting data before shutdown. In this case, it ensures any remaining items in the queue are flushed to the database when the server exits normally.

If you’re building something that needs graceful shutdown (like logging, data persistence, or closing open connections), atexit is a simple and elegant tool to consider.