Building a Real-Time Wireless Access Points Monitoring System Using Python and RabbitMQ
Introduction
In this post, we’ll take you through the process of building a real-time monitoring system for wireless access points (APs) using Python. The system comprises two applications:
app_a
: Monitors a JSON file for changes in the APs.app_b
: Displays these changes as notified byapp_a
.
We use RabbitMQ as the messaging broker between these applications to facilitate reliable and scalable communication. Additionally, Docker is used to containerize the applications for easy deployment and scaling.
Project Overview
The monitoring system is designed to:
- Watch a JSON file (
access_points.json
) containing information about various wireless APs. - Detect changes (SNR, channel updates, addition, or removal of APs).
- Send notifications of these changes to a message queue (RabbitMQ).
- Listen to these notifications with a second application (
app_b
) and display them in a human-readable format. - Support multiple instances of the listener application (
app_b
) running across different environments.
Prerequisites
- Python 3.9+
- Docker and Docker Compose
- RabbitMQ installed and running (can be set up with Docker)
Step-by-Step Implementation
Step 1: Setting Up the JSON File (access_points.json
)
Create a JSON file named access_points.json
to store the information about surrounding wireless APs. Here’s a sample structure:
{
"access_points": [
{
"ssid": "MyAP",
"snr": 82,
"channel": 11
},
{
"ssid": "NewAP",
"snr": 70,
"channel": 3
}
]
}
Step 2: Creating app_a.py
(The Monitoring Application)
app_a.py
is responsible for monitoring changes in access_points.json
. It compares the current state of the file with its previous state to detect changes and sends messages to RabbitMQ when changes are detected. You can find the script here.
Key functionalities:
- File Monitoring: Continuously monitors a JSON file (
access_points.json
) for changes every 5 seconds, identifying new, removed, or modified access points (changes in SNR or channel). - Change Detection: Compares current access point data with previous data to detect additions, removals, or property changes and generates descriptive change logs.
- RabbitMQ Connection: Connects to RabbitMQ using environment variables, with retry logic (up to 5 attempts) to handle connection failures.
- Message Publishing: Publishes detected changes to a RabbitMQ
fanout
exchange, ensuring all connected consumers receive the updates, and closes the connection after publishing. - Logging: Logs information and errors throughout the process for file monitoring, change detection, and RabbitMQ operations, aiding in troubleshooting.
def monitor_file(self):
"""Monitor the access points JSON file for changes."""
logging.info(f"Starting to monitor {self.file_path} for changes.")
self.previous_data = self.load_access_points()
while True:
time.sleep(5) # Check for changes every 5 seconds
current_data = self.load_access_points()
if current_data != self.previous_data:
changes = self.compare_access_points(current_data)
if changes:
logging.info(f"Detected changes: {changes}")
self.notify_app_b(changes)
self.previous_data = current_data
This function monitor changes in JSON file every 5 seconds, if there is any change in the JSON file it will call notify_app_b method to notify the app_b instances
def notify_app_b(self, changes):
"""Notify App B of the changes by sending data through RabbitMQ."""
# Step 1: Establish a connection to RabbitMQ
connection = self.connect_to_rabbitmq()
# Step 2: Publish changes if connection was successful
if connection:
self.publish_changes(connection, changes)
else:
logging.error("Cannot notify App B. No RabbitMQ connection established.")
Notify App B of the changes by sending data through RabbitMQ.
def connect_to_rabbitmq(self):
"""Establish a connection to RabbitMQ and return the connection object."""
for attempt in range(5): # Retry up to 5 times
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=Config.RABBITMQ_SERVER,
port=5672,
credentials=pika.PlainCredentials(Config.RABBITMQ_USER, Config.RABBITMQ_PASSWORD)
)
)
return connection
except Exception as e:
logging.error(f"Failed to connect to RabbitMQ: {e}")
time.sleep(5) # Wait 5 seconds before retrying
logging.error("Failed to connect to RabbitMQ after multiple attempts.")
return None
Establish a connection to RabbitMQ and return the connection object
def publish_changes(self, connection, changes):
"""Publish the changes to the RabbitMQ exchange using an established connection."""
try:
channel = connection.channel()
# Declare an exchange type
channel.exchange_declare(exchange=self.exchange_name, exchange_type=self.exchange_type)
for change in changes:
# Publish each change to the exchange
channel.basic_publish(exchange=self.exchange_name, routing_key='', body=change)
logging.info(f"Sent to exchange: {change}")
except Exception as e:
logging.error(f"Error while publishing changes: {e}")
finally:
# Close the connection
connection.close()
Publish the changes to the RabbitMQ exchange using an established connection.
Step 3: Creating app_b.py
(The Display/Consumer Application)
app_b.py
listens for messages from RabbitMQ and displays the detected changes. Since app_b
might run on multiple instances, using RabbitMQ ensures that all instances receive the notifications. You can find the script here.
Key functionalities:
- Message Consumption: Listen to the RabbitMQ queue for messages from
app_a
. - Display Changes: Process and display the changes in a human-readable format.
def start_listening(self):
"""Start listening to the RabbitMQ exchange for messages."""
if not self.connect():
logging.error("Failed to connect to RabbitMQ after multiple attempts.")
return
# Create a temporary queue with a random name
result = self.channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind the queue to the exchange
self.channel.queue_bind(exchange=self.exchange_name, queue=queue_name)
logging.info(f"Listening for messages on exchange '{self.exchange_name}'...")
# Subscribe to the queue
self.channel.basic_consume(queue=queue_name, on_message_callback=self.callback, auto_ack=True)
try:
# Start consuming messages
self.channel.start_consuming()
except KeyboardInterrupt:
logging.info("Listener stopped.")
finally:
if self.connection:
self.connection.close()
Start listening to the RabbitMQ exchange for messages.
Step 4: Configuration File (config.py
and .env
)
Use a config.py
file to store configuration details, such as RabbitMQ connection details, etc. Additionally, create a .env
file for storing sensitive information (e.g., credentials).
#.env
RABBITMQ_USER=user
RABBITMQ_PASSWORD=password
RABBITMQ_SERVER=rabbitmq
Step 5: Containerizing Applications with Docker
Create a Dockerfile
for each application (app_a.py
and app_b.py
) to containerize them for easier deployment.
# app_a/Dockerfile
FROM python:3.9-slim
# Set the working directory
WORKDIR /app
# Copy the Python script and JSON file into the container
COPY * /app/
# Install necessary Python package
RUN pip install pika
# Run the application
CMD ["python3", "app_a.py"]
Repeat a similar process for app_b.py
.
Step 6: Setting Up RabbitMQ with Docker compose file
To facilitate communication between app_a
and app_b
, RabbitMQ is used as the message broker. You can easily set up RabbitMQ using Docker:
- Create a Docker Compose file (
docker-compose.yml
) to define RabbitMQ services
version: '3.8' # Specify the version of the compose file
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_CONF: |
deprecated_features.permit.management_metrics_collection = true
networks:
- ap_network
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
retries: 5
start_period: 30s
timeout: 5s
Step 7: Running the Applications with Docker Compose
Define a docker-compose.yml
file to run app_a
and multiple instances of app_b
along with RabbitMQ:
version: '3.8' # Specify the version of the compose file
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_CONF: |
deprecated_features.permit.management_metrics_collection = true
networks:
- ap_network
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
retries: 5
start_period: 30s
timeout: 5s
app_a:
build: ./app_a
container_name: app_a
depends_on:
rabbitmq:
condition: service_healthy
env_file:
- .env # Load environment variables from .env file
volumes:
- ./app_a/access_points.json:/app/access_points.json
networks:
- ap_network
app_b:
build: ./app_b
depends_on:
rabbitmq:
condition: service_healthy
env_file:
- .env # Load environment variables from .env file
networks:
- ap_network
deploy:
replicas: 3
networks:
ap_network:
driver: bridge
Step 8: Run the System
Run the following command to start all the services.
docker-compose up --build
The command docker-compose up --build
rebuilds Docker images and starts the containers as defined in the docker-compose.yml
file. It’s useful when you’ve made changes to the code or Dockerfile and need to apply them.
Step 9: Testing the System
- Modify the
access_points.json
file to simulate changes. app_a
will detect these changes and send notifications to RabbitMQ.- Multiple instances of
app_b
will receive and display these changes.
Expected output examples:
MyAP's SNR has changed from 63 to 82
NewAP's channel has changed from 3 to 6
HisAP is removed from the list
HerAP is added to the list with SNR 71 and channel 1
Conclusion
In this blog, we’ve covered the step-by-step implementation of a real-time wireless access points monitoring system using Python, RabbitMQ, and Docker. This project demonstrates how to:
- Monitor files for changes.
- Use RabbitMQ for decoupled, reliable communication between applications.
- Containerize applications with Docker for easy deployment and scaling.
You can find the full project here.