Goal
Build an IoT Anomaly Monitor for time-series sensor readings. Ingest readings from MongoDB (or JSONL first), compute rolling statistics, flag anomalies, plot daily temperature trend, and export anomalies to CSV + JSON + MongoDB + MySQL + PNG.
Difficulty: Difficult • Time-series + rolling stats + multi-db exports.
Data entry sources
Start with the included iot_readings.jsonl, then switch to MongoDB input later.
📥 Read JSONL (one JSON per line)
import json
import pandas as pd
rows = []
with open("../data/iot_readings.jsonl","r",encoding="utf-8") as f:
for line in f:
if line.strip():
rows.append(json.loads(line))
df = pd.DataFrame(rows)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
print(df.head())
print(df.info())
🍃 Read from MongoDB
import pandas as pd
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017")
col = client["pp_iot"]["readings"]
docs = list(col.find({}, {"_id": 0}))
df = pd.DataFrame(docs)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
print(df.head())
Pandas processing (rolling anomalies)
Rolling mean/std per sensor. Flag anomaly if |z| > 3.
🧠 Rolling z-score anomaly detection
import pandas as pd
df = pd.read_json("../data/iot_readings.jsonl", lines=True)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
df = df.dropna(subset=["ts"]).sort_values(["sensor_id","ts"])
window = 36 # 6 hours if data is every 10 mins
grp = df.groupby("sensor_id", group_keys=False)
df["temp_mean"] = grp["temp_c"].apply(lambda s: s.rolling(window, min_periods=12).mean())
df["temp_std"] = grp["temp_c"].apply(lambda s: s.rolling(window, min_periods=12).std())
df["temp_z"] = (df["temp_c"] - df["temp_mean"]) / df["temp_std"]
df["is_anomaly"] = df["temp_z"].abs() > 3
anoms = df[df["is_anomaly"]][["ts","sensor_id","site","temp_c","temp_z"]]
print(anoms.head())
Charts (Matplotlib) → PNG
Plot daily average temperature for one sensor and save PNG.
📈 Daily temp trend
import pandas as pd
import matplotlib.pyplot as plt
import os
os.makedirs("../output", exist_ok=True)
df = pd.read_json("../data/iot_readings.jsonl", lines=True)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
df = df.dropna(subset=["ts"])
sensor = df["sensor_id"].value_counts().index[0]
one = df[df["sensor_id"] == sensor].set_index("ts").sort_index()
daily = one["temp_c"].resample("D").mean()
plt.figure()
plt.plot(daily.index, daily.values, marker="o")
plt.title(f"Daily Avg Temp Trend — {sensor}")
plt.xlabel("Date")
plt.ylabel("Temp (°C)")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("../output/iot_daily_temp_trend.png", dpi=200)
plt.show()
Exports to CSV / JSON / MongoDB / MySQL
💾 Export anomalies to CSV + JSON
import pandas as pd
import os
os.makedirs("../output", exist_ok=True)
df = pd.read_json("../data/iot_readings.jsonl", lines=True)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
df = df.dropna(subset=["ts"]).sort_values(["sensor_id","ts"])
window = 36
grp = df.groupby("sensor_id", group_keys=False)
df["temp_mean"] = grp["temp_c"].apply(lambda s: s.rolling(window, min_periods=12).mean())
df["temp_std"] = grp["temp_c"].apply(lambda s: s.rolling(window, min_periods=12).std())
df["temp_z"] = (df["temp_c"] - df["temp_mean"]) / df["temp_std"]
df["is_anomaly"] = df["temp_z"].abs() > 3
anoms = df[df["is_anomaly"]][["ts","sensor_id","site","temp_c","temp_z"]]
anoms.to_csv("../output/iot_anomalies.csv", index=False)
anoms.to_json("../output/iot_anomalies.json", orient="records", indent=2)
🍃 Export to MongoDB
import pandas as pd
from pymongo import MongoClient
df = pd.read_json("../output/iot_anomalies.json")
client = MongoClient("mongodb://localhost:27017")
col = client["pp_iot"]["anomalies"]
col.delete_many({})
if len(df) > 0:
col.insert_many(df.to_dict(orient="records"))
print("Mongo export done ✅")
🗄 Export to MySQL
import pandas as pd
from sqlalchemy import create_engine
MYSQL_URL = "mysql+pymysql://USER:PASSWORD@localhost:3306/DBNAME"
df = pd.read_json("../output/iot_anomalies.json")
engine = create_engine(MYSQL_URL, pool_pre_ping=True)
df.to_sql("pp_iot_anomalies", con=engine, if_exists="replace", index=False)
print("MySQL export done ✅")
Run steps
- Install deps: pip install -r requirements.txt
- Run JSONL version first: python difficult_iot_anomaly_monitor.py
- Enable Mongo input/export by editing URLs inside script
- Enable MySQL export similarly (edit MYSQL_URL)
- Check output/ for CSV/JSON/PNG