Skip to content

Handling databases using python

I want to create a database for storing the predictions and actual values of the air pollution near my home in Hyderabad. In a previous blog, I have demonstrated how we can deploy a flask application using pythonanywhere which predicts the pollution using a machine learning model that I built. In this blog, we will see how we can store this information in a database to be used either for visualisation or other analytics.

Free database on ElephantSQL

Elephant SQL automates setup and running of PostgreSQL clusters. They have a free plan called "Tiny turtle" which gives us 5 concurrent connections and 20 MB of data. This is sufficient for storing our data.
png
Using the free plan, we can create an instance called "hydpm25" (For Hyderabad PM25 data) png

Creating tables

Once we have a cluster created, we can create tables in three ways.
1. Using SQL commands on "SQL Browser"
2. Using psycopg2 in Python
3. Using SQLAlchemy

SQL Queries

First, let us create a table using SQL Browser. For creating a new table "daily_historic_pollution" which will contain the historic pollution data of Hyderabad we can use the following query.

CREATE DATABASE hyderabad_aqi;
CREATE TABLE daily_historic_pollution(
  date DATE PRIMARY KEY,
  type VARCHAR(20),
  value FLOAT
);

Load data using python

We can use psycopg to connect and create the same table. The URL to connect is provided by ElephantSQL under "Details".

After connecting to the database, we can create tables using SQL commands.

import psycopg2
connection_string = "postgresql://username:password@hostname:hostnumber/database"
conn = psycopg2.connect(connection_string)

conn.set_session(autocommit=True)
cur = conn.cursor()

cur.execute(
  """
  DROP TABLE IF EXISTS daily_historic_pollution
  """)

cur.execute(
  """
  CREATE TABLE daily_historic_pollution (date DATE PRIMARY KEY,
                         type VARCHAR(20),
                         value INTEGER
                         )
  """)
print("Created table")
cur.close()

conn.close()
Created table

SqlAlchemy

According to me, the best way to interface with a database is using SqlAlchemy. For example, consider the data from AQICN's API on the air pollution data for Hyderabad.

import pandas as pd
df = pd.read_csv('hyderabad-us consulate, india-air-quality.csv')
df['type'] = 'pm25'
df.columns = ['date', 'value', 'type']
df.head()
date value type
0 2021/10/1 87 pm25
1 2021/10/2 47 pm25
2 2021/10/3 50 pm25
3 2021/9/1 66 pm25
4 2021/9/2 74 pm25

Using SqlAlchemy, we can upload this data to the previously created table.

from sqlalchemy import create_engine
import numpy
from psycopg2.extensions import register_adapter, AsIs
def addapt_numpy_float64(numpy_float64):
    return AsIs(numpy_float64)
def addapt_numpy_int64(numpy_int64):
    return AsIs(numpy_int64)
register_adapter(numpy.float64, addapt_numpy_float64)
register_adapter(numpy.int64, addapt_numpy_int64)
engine = create_engine(connection_string)
for i in range(len(df)):
    query = """INSERT INTO daily_historic_pollution (date, type, value) VALUES(%s, %s, %s)
    ON CONFLICT (date) DO UPDATE 
      SET type = excluded.type, 
      value = excluded.value;
    """
    engine.execute(query, (df.date[i], df.type[i], df.value[i]))

png

HTTP requests

In this way, we can load data into a database. Now I want to create a new table that can store the predictions from the flask application into a table.

conn = psycopg2.connect(connection_string)
conn.set_session(autocommit=True)
cur = conn.cursor()

cur.execute(
  """
  DROP TABLE IF EXISTS daily_prediction_pollution
  """)

cur.execute(
  """
  CREATE TABLE daily_prediction_pollution (date DATE PRIMARY KEY,
                         type VARCHAR(20),
                         prediction FLOAT
                         )
  """)
print("Created table")
cur.close()

conn.close()
Created table

We can use the GET request from the flask application in the previous blog to predict PM25 for the date.

import requests
from datetime import date, datetime
today = str(date.today().year)+'-'+str(date.today().month)+'-'+str(date.today().day)
response = requests.get("https://harshaash.pythonanywhere.com/predict", params={'date': today})
response.json()['prediction'][0]
115.6

The prediction is 115.6 PM25. We can use our knowledge on SqlAlchemy to store this value in a database.

query2 = """INSERT INTO daily_prediction_pollution (date, type, prediction) VALUES(%s, %s, %s)
    ON CONFLICT (date) DO UPDATE 
      SET type = excluded.type, 
      prediction = excluded.prediction;
    """
engine.execute(query2, (today, 'pm25', response.json()['prediction'][0]))

We can see this result in the database png We can get actual pollution data for the past from JSON API of AQICN.org. We can use this data to update the database with the previous day's value.

aqi_token = {'token':'default_token'}
response = requests.get("https://api.waqi.info/feed/hyderabad/", params=aqi_token)
data = pd.DataFrame(response.json()['data']['forecast']['daily']['pm25'])
data.day = pd.to_datetime(data.day)
data['type'] = 'pm25'
data.columns = ['value', 'date', 'max', 'min', 'type']
data
value date max min type
0 158 2021-10-15 159 145 pm25
1 150 2021-10-16 159 138 pm25
2 152 2021-10-17 169 138 pm25
3 140 2021-10-18 159 138 pm25
4 145 2021-10-19 158 138 pm25
5 138 2021-10-20 138 138 pm25
6 134 2021-10-21 138 89 pm25
7 133 2021-10-22 138 89 pm25
8 138 2021-10-23 138 138 pm25

From the API result, we can see the previous two day's data and predictions (by AQICN) for the next few days. We can use this API to update the data for previous day's data in our table.

for i in range(2):
    query = """INSERT INTO daily_historic_pollution (date, type, value) VALUES(%s, %s, %s)
    ON CONFLICT (date) DO UPDATE 
      SET type = excluded.type, 
      value = excluded.value;
    """
    engine.execute(query, (data.date[i], data.type[i], data.value[i]))

Scheduling using GitHub Actions

I want to run these predictions every day, and the data/predictions are stored at a set point of time every day. To do this, we can combine all the codes above into one python file that gets executed every day. This file contains code that will predict the pollution for today and store this value along with the pollution for yesterday in the database.

# File name: aqi_script.py
import pandas as pd
import requests
from datetime import date, datetime
from sqlalchemy import create_engine
import numpy
from psycopg2.extensions import register_adapter, AsIs
def addapt_numpy_float64(numpy_float64):
    return AsIs(numpy_float64)
def addapt_numpy_int64(numpy_int64):
    return AsIs(numpy_int64)
register_adapter(numpy.float64, addapt_numpy_float64)
register_adapter(numpy.int64, addapt_numpy_int64)

connection_string = "postgresql://username:password@hostname:hostnumber/database"
engine = create_engine(connection_string)
today = str(date.today().year)+'-'+str(date.today().month)+'-'+str(date.today().day)

# Upload the prediction data
response = requests.get("https://harshaash.pythonanywhere.com/predict", params={'date': today})
query2 = """INSERT INTO daily_prediction_pollution (date, type, prediction) VALUES(%s, %s, %s)
    ON CONFLICT (date) DO UPDATE 
      SET type = excluded.type, 
      prediction = excluded.prediction;
    """
engine.execute(query2, (today, 'pm25', response.json()['prediction'][0]))

# Upload the actual data
aqi_token = {'token':'dummy_token'}
response = requests.get("https://api.waqi.info/feed/hyderabad/", params=aqi_token)
data = pd.DataFrame(response.json()['data']['forecast']['daily']['pm25'])
data.day = pd.to_datetime(data.day)
data['type'] = 'pm25'
data.columns = ['value', 'date', 'max', 'min', 'type']
for i in range(2):
    query = """INSERT INTO daily_historic_pollution (date, type, value) VALUES(%s, %s, %s)
    ON CONFLICT (date) DO UPDATE 
      SET type = excluded.type, 
      value = excluded.value;
    """
    engine.execute(query, (data.date[i], data.type[i], data.value[i]))

We can schedule this code to run every day at a particular time so that our database is updated every day with the predictions. This can be done using cron jobs. One way to implement cron jobs is using GitHub actions. To create a cron job, a yml file should be created within the .github/workflows folder in the GitHub repository. This yml file is converted to actions, and this is shown on the actions page. For more details, refer here. The yml file for our use case would be:

# FIle name: any_name.yml
# location: .github/workflows
name: update_database_AQI_data
on:
  schedule:
    - cron: '5 1 * * *' # to run at 1:05 AM GMT everyday

jobs:
  build:
    name: Build project
    runs-on: ubuntu-latest
    steps:

      - name: Checkout repository
        uses: actions/checkout@v2 # Downloads the current git repo

      - name: Setup Python
        uses: actions/setup-python@v2 # Installs the python setup

      - name: Install dependancies # Installs the required packages
        run: |
          python -m pip install --upgrade pip 
          pip install -r requirements.txt

      - name: Execute python file # Run the script file
        run: python aqi_script.py

Created by Achyuthuni Sri Harsha

Back to top