Async Batch Processing Pipelines for Indoor Mapping & Wayfinding
Indoor mapping operations routinely ingest hundreds of heterogeneous floor plans per deployment cycle. Synchronous processing models introduce unacceptable latency, block event loops during heavy geometry operations, and create cascading failures when upstream CAD/BIM sources throttle throughput. A production-grade async batch architecture decouples ingestion, parsing, vectorization, and topology construction into discrete, non-blocking stages. Within the broader Automated Floor Plan Parsing & Vectorization ecosystem, the pipeline must enforce strict memory boundaries, handle backpressure gracefully, and maintain deterministic output schemas for downstream GIS consumption.
Pipeline Architecture & Ingestion Strategy
The ingestion layer operates as a coroutine-driven file scanner that populates a bounded asyncio.Queue. By limiting queue depth to 2 * worker_count, the system prevents unbounded memory allocation when scanning network-mounted directories or cloud storage buckets. Each queued task carries a lightweight manifest: file path, source format, floor identifier, and a UUID for distributed tracing. This manifest-driven approach ensures that heavy I/O operations are isolated from CPU-bound vectorization routines, allowing the event loop to remain responsive while workers process geometry.
Format-specific ingestion requires tailored extraction logic. For instance, rasterized PDFs and vector-based DWG files demand different read strategies before they can enter the unified processing stream. Refer to SVG/DWG Parsing Workflows for format-specific normalization routines that should execute prior to queue insertion.
Asynchronous Worker Orchestration & Concurrency Control
Floor plan processing exhibits a hybrid workload profile: file I/O and database writes are inherently asynchronous, while geometric operations (polygonization, snapping, intersection testing) are CPU-bound. Python’s asyncio runtime cannot natively parallelize CPU work, requiring explicit delegation to process or thread pools. The architectural blueprint outlined in Building async pipelines for batch floor plan processing demonstrates how to bridge this gap using loop.run_in_executor paired with a ProcessPoolExecutor for heavy geometry kernels.
Concurrency control is enforced via asyncio.Semaphore. Without explicit throttling, workers will exhaust file descriptors, saturate disk I/O bandwidth, or trigger OS-level OOM kills when processing multi-story IFC exports. A semaphore initialized to the physical core count (or a tuned fraction for containerized deployments) guarantees that only a predictable number of vectorization tasks run concurrently. For detailed tuning strategies on worker pool sizing and memory footprint reduction, consult Optimizing Python async workers for parsing.
Production Implementation: Runnable Pipeline
The following implementation provides a complete, production-ready async batch processor. It integrates bounded queues, semaphore-controlled concurrency, process pool delegation for CPU-bound geometry, and graceful cancellation handling.
import asyncio
import aiofiles
import logging
import uuid
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import AsyncIterator, Dict, Any
from concurrent.futures import ProcessPoolExecutor
import time
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
logger = logging.getLogger("indoor_pipeline")
# --- Data Structures ---
@dataclass
class TaskManifest:
file_path: Path
source_format: str
floor_id: str
trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
retry_count: int = 0
# --- CPU-Bound Geometry Kernel (Runs in separate process) ---
def process_geometry_kernel(file_path: str, source_format: str) -> Dict[str, Any]:
"""
Simulates heavy CPU-bound geometry processing.
Replace with actual Shapely/GeoPandas/IFCopenshell routines.
"""
logger.info(f"[CPU Kernel] Processing {file_path} ({source_format})")
time.sleep(0.5) # Simulate polygonization, snapping, topology validation
return {
"status": "success",
"geometry_count": 142,
"topology_valid": True,
"floor_id": Path(file_path).stem.split("_")[0]
}
# --- Async Pipeline Components ---
class AsyncBatchPipeline:
def __init__(self, max_workers: int = 4, queue_depth: int = 8):
self.max_workers = max_workers
self.queue_depth = queue_depth
self.task_queue: asyncio.Queue[TaskManifest] = asyncio.Queue(maxsize=queue_depth)
self.semaphore = asyncio.Semaphore(max_workers)
self.executor = ProcessPoolExecutor(max_workers=max_workers)
self.loop = asyncio.get_running_loop()
self._shutdown_event = asyncio.Event()
async def ingest_scanner(self, scan_dir: Path) -> None:
"""Coroutine-driven directory scanner with backpressure."""
logger.info(f"Starting ingestion scan: {scan_dir}")
for fp in scan_dir.rglob("*"):
if fp.suffix.lower() in {".dwg", ".svg", ".pdf", ".ifc"}:
manifest = TaskManifest(
file_path=fp,
source_format=fp.suffix.lstrip("."),
floor_id=fp.stem.split("_")[0]
)
# await blocks until queue has space (backpressure)
await self.task_queue.put(manifest)
logger.debug(f"Enqueued: {fp.name} | Queue size: {self.task_queue.qsize()}")
await self.task_queue.put(None) # Sentinel for graceful shutdown
async def worker(self, worker_id: int) -> None:
"""Drains queue, respects semaphore, delegates CPU work."""
while True:
manifest = await self.task_queue.get()
if manifest is None:
self.task_queue.task_done()
break
async with self.semaphore:
try:
logger.info(f"Worker-{worker_id} | Processing {manifest.file_path.name}")
# Delegate CPU-bound work to process pool
result = await self.loop.run_in_executor(
self.executor,
process_geometry_kernel,
str(manifest.file_path),
manifest.source_format
)
# Async export / DB write
await self._export_to_gis(result, manifest.trace_id)
logger.info(f"Worker-{worker_id} | Completed {manifest.file_path.name}")
except Exception as e:
logger.error(f"Worker-{worker_id} | Failed {manifest.file_path.name}: {e}")
if manifest.retry_count < 2:
manifest.retry_count += 1
await self.task_queue.put(manifest)
finally:
self.task_queue.task_done()
async def _export_to_gis(self, geo_result: Dict[str, Any], trace_id: str) -> None:
"""Simulate async GIS export / topology registration."""
async with aiofiles.open(f"output_{trace_id}.json", "w") as f:
await f.write(str(geo_result))
async def run(self, scan_dir: Path) -> None:
"""Orchestrate pipeline lifecycle."""
# Start workers
workers = [asyncio.create_task(self.worker(i)) for i in range(self.max_workers)]
# Start scanner
scanner = asyncio.create_task(self.ingest_scanner(scan_dir))
# Wait for scanner to finish
await scanner
# Wait for all queued tasks to complete
await self.task_queue.join()
# Cancel workers gracefully
for w in workers:
w.cancel()
await asyncio.gather(*workers, return_exceptions=True)
self.executor.shutdown(wait=True)
logger.info("Pipeline completed successfully.")
# --- Entry Point ---
async def main():
scan_directory = Path("./floor_plans_input")
scan_directory.mkdir(exist_ok=True)
# Create dummy files for demonstration
for i in range(12):
(scan_directory / f"level_{i}_A.dwg").touch()
pipeline = AsyncBatchPipeline(max_workers=4, queue_depth=8)
await pipeline.run(scan_directory)
if __name__ == "__main__":
asyncio.run(main())
Downstream Topology & GIS Integration
Once vectorized, parsed floor plans feed directly into spatial analysis engines. The pipeline’s deterministic output schema ensures seamless handoff to Wall & Door Detection Algorithms, which rely on clean, non-overlapping polygon geometries and consistent attribute mapping.
For large-scale campus deployments or multi-story BIM environments, scaling strategies must account for IFC schema complexity and spatial reference transformations. See Scaling Python workers for IFC parsing for memory-mapped IFC extraction patterns that prevent worker thrashing.
The pipeline’s trace IDs enable real-time topology updates in wayfinding engines. By publishing completion events to a message broker (e.g., Redis Streams or RabbitMQ), indoor navigation teams can trigger incremental graph rebuilds without requiring full dataset reloads.
Operational Troubleshooting & Runbook
| Symptom | Root Cause | Resolution |
|---|---|---|
EventLoopBlockedWarning or high latency spikes |
CPU-bound geometry running on main thread | Ensure all heavy operations use loop.run_in_executor with ProcessPoolExecutor. Never call Shapely/IFCopenshell synchronously inside async functions. |
OSError: [Errno 24] Too many open files |
Unbounded concurrent file handles | Lower max_workers, increase OS ulimit -n, and ensure aiofiles contexts are properly closed. Use asyncio.Semaphore to cap concurrent I/O. |
| Queue starvation / workers idle | Scanner coroutine yielding too slowly or blocking on network mounts | Implement async directory traversal (aiopath or aiofiles.os.scandir), add timeout guards, and verify network storage latency. |
MemoryError / OOM kills during IFC processing |
Process pool workers loading entire BIM models into RAM | Switch to chunked IFC parsing, enable multiprocessing.shared_memory for large arrays, and enforce strict queue_depth limits. |
CancelledError propagating unexpectedly |
Improper task cancellation during graceful shutdown | Use asyncio.gather(*tasks, return_exceptions=True) and wrap critical export steps in asyncio.shield() to prevent partial writes. |
Proactive Monitoring:
- Track
queue.qsize()vsqueue.maxsizeto detect backpressure bottlenecks. - Log executor task duration percentiles; if P95 exceeds 5s, increase
ProcessPoolExecutorworkers or optimize geometry kernels. - Use
tracemallocin staging to identify memory leaks in long-running worker processes.
For authoritative reference on async queue semantics and executor lifecycle management, consult the official asyncio Queue documentation and concurrent.futures documentation.