🍊 Programmer’s Picnic • Pandas Projects

Difficult Project — IoT Anomaly Monitor

MongoDB/JSONL → Pandas rolling anomaly detection → Matplotlib trend plot → export CSV/JSON/MongoDB/MySQL/PNG.

Pandas • Matplotlib CSV • JSON • MySQL • MongoDB Exports: CSV • JSON • MySQL • MongoDB • PNG

Goal

Build an IoT Anomaly Monitor for time-series sensor readings. Ingest readings from MongoDB (or JSONL first), compute rolling statistics, flag anomalies, plot daily temperature trend, and export anomalies to CSV + JSON + MongoDB + MySQL + PNG.

Difficulty: Difficult • Time-series + rolling stats + multi-db exports.

Data entry sources

Start with the included iot_readings.jsonl, then switch to MongoDB input later.

📥 Read JSONL (one JSON per line)
import json
import pandas as pd

rows = []
with open("../data/iot_readings.jsonl","r",encoding="utf-8") as f:
    for line in f:
        if line.strip():
            rows.append(json.loads(line))

df = pd.DataFrame(rows)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
print(df.head())
print(df.info())
🍃 Read from MongoDB
import pandas as pd
from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017")
col = client["pp_iot"]["readings"]

docs = list(col.find({}, {"_id": 0}))
df = pd.DataFrame(docs)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce")

print(df.head())

Pandas processing (rolling anomalies)

Rolling mean/std per sensor. Flag anomaly if |z| > 3.

🧠 Rolling z-score anomaly detection
import pandas as pd

df = pd.read_json("../data/iot_readings.jsonl", lines=True)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
df = df.dropna(subset=["ts"]).sort_values(["sensor_id","ts"])

window = 36  # 6 hours if data is every 10 mins
grp = df.groupby("sensor_id", group_keys=False)

df["temp_mean"] = grp["temp_c"].apply(lambda s: s.rolling(window, min_periods=12).mean())
df["temp_std"]  = grp["temp_c"].apply(lambda s: s.rolling(window, min_periods=12).std())
df["temp_z"]    = (df["temp_c"] - df["temp_mean"]) / df["temp_std"]

df["is_anomaly"] = df["temp_z"].abs() > 3
anoms = df[df["is_anomaly"]][["ts","sensor_id","site","temp_c","temp_z"]]
print(anoms.head())

Charts (Matplotlib) → PNG

Plot daily average temperature for one sensor and save PNG.

📈 Daily temp trend
import pandas as pd
import matplotlib.pyplot as plt
import os

os.makedirs("../output", exist_ok=True)

df = pd.read_json("../data/iot_readings.jsonl", lines=True)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
df = df.dropna(subset=["ts"])

sensor = df["sensor_id"].value_counts().index[0]
one = df[df["sensor_id"] == sensor].set_index("ts").sort_index()
daily = one["temp_c"].resample("D").mean()

plt.figure()
plt.plot(daily.index, daily.values, marker="o")
plt.title(f"Daily Avg Temp Trend — {sensor}")
plt.xlabel("Date")
plt.ylabel("Temp (°C)")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("../output/iot_daily_temp_trend.png", dpi=200)
plt.show()

Exports to CSV / JSON / MongoDB / MySQL

💾 Export anomalies to CSV + JSON
import pandas as pd
import os

os.makedirs("../output", exist_ok=True)

df = pd.read_json("../data/iot_readings.jsonl", lines=True)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
df = df.dropna(subset=["ts"]).sort_values(["sensor_id","ts"])

window = 36
grp = df.groupby("sensor_id", group_keys=False)
df["temp_mean"] = grp["temp_c"].apply(lambda s: s.rolling(window, min_periods=12).mean())
df["temp_std"]  = grp["temp_c"].apply(lambda s: s.rolling(window, min_periods=12).std())
df["temp_z"]    = (df["temp_c"] - df["temp_mean"]) / df["temp_std"]
df["is_anomaly"] = df["temp_z"].abs() > 3

anoms = df[df["is_anomaly"]][["ts","sensor_id","site","temp_c","temp_z"]]
anoms.to_csv("../output/iot_anomalies.csv", index=False)
anoms.to_json("../output/iot_anomalies.json", orient="records", indent=2)
🍃 Export to MongoDB
import pandas as pd
from pymongo import MongoClient

df = pd.read_json("../output/iot_anomalies.json")

client = MongoClient("mongodb://localhost:27017")
col = client["pp_iot"]["anomalies"]

col.delete_many({})
if len(df) > 0:
    col.insert_many(df.to_dict(orient="records"))

print("Mongo export done ✅")
🗄 Export to MySQL
import pandas as pd
from sqlalchemy import create_engine

MYSQL_URL = "mysql+pymysql://USER:PASSWORD@localhost:3306/DBNAME"

df = pd.read_json("../output/iot_anomalies.json")
engine = create_engine(MYSQL_URL, pool_pre_ping=True)

df.to_sql("pp_iot_anomalies", con=engine, if_exists="replace", index=False)
print("MySQL export done ✅")

Run steps

  1. Install deps: pip install -r requirements.txt
  2. Run JSONL version first: python difficult_iot_anomaly_monitor.py
  3. Enable Mongo input/export by editing URLs inside script
  4. Enable MySQL export similarly (edit MYSQL_URL)
  5. Check output/ for CSV/JSON/PNG