Archived
1
0

Some working db writer code.

This commit is contained in:
Shaun Setlock
2021-10-23 07:47:59 -04:00
parent fbdc9ca21e
commit 1322637fef
2 changed files with 42 additions and 70 deletions

2
.gitignore vendored
View File

@@ -138,3 +138,5 @@ dmypy.json
# Cython debug symbols # Cython debug symbols
cython_debug/ cython_debug/
# Secrets
.creds*

View File

@@ -1,16 +1,15 @@
#! env/bin/python3 #! env/bin/python3
# import the psycopg2 database adapter for PostgreSQL # import the psycopg2 database adapter for PostgreSQL
from psycopg2 import connect, Error import psycopg2
import json
from psycopg2.extras import Json from psycopg2.extras import Json
from psycopg2.extras import json as psycop_json import json
import sys import sys
def connect_db(db: str,host: str,user: str,passwd: str): def connect_db(db: str,host: str,user: str,passwd: str):
try: try:
# declare a new PostgreSQL connection object # declare a new PostgreSQL connection object
conn = connect( conn = psycopg2.connect(
dbname = db, dbname = db,
user = user, user = user,
host = host, host = host,
@@ -18,86 +17,57 @@ def connect_db(db: str,host: str,user: str,passwd: str):
# attempt to connect for 3 seconds then raise exception # attempt to connect for 3 seconds then raise exception
connect_timeout = 3 connect_timeout = 3
) )
cur = conn.cursor()
except (Exception, Error) as err: except (Exception, psycopg2.Error) as err:
#print ("\npsycopg2 connect error:", err) #print ("\npsycopg2 connect error:", err)
#conn = None
cur = None
return cur
# use Python's open() function to load the JSON data
with open('postgres-records.json') as json_data:
# use load() rather than loads() for JSON files
record_list = json.load(json_data)
# concatenate an SQL string
sql_string = 'INSERT INTO {} '.format( table_name )
# if record list then get column names from first key
if type(record_list) == list:
first_record = record_list[0]
columns = list(first_record.keys())
print ("\ncolumn names:", columns)
# if just one dict obj or nested JSON dict
else:
print ("Needs to be an array of JSON objects")
sys.exit()
# enclose the column names within parenthesis
sql_string += "(" + ', '.join(columns) + ")\nVALUES "
# only attempt to execute SQL if cursor is valid
if cur != None:
try:
cur.execute( sql_string )
conn.commit()
print ('\nfinished INSERT INTO execution')
except (Exception, Error) as error:
print("\nexecute_sql() error:", error)
conn.rollback()
# close the cursor and connection
cur.close()
conn.close()
######################################################3
import psycopg2
from config import config
def insert_vendor(vendor_name):
""" insert a new vendor into the vendors table """
sql = """INSERT INTO vendors(vendor_name)
VALUES(%s) RETURNING vendor_id;"""
conn = None conn = None
vendor_id = None return conn
def get_db_creds(file: str):
with open(file) as cred_file:
creds = json.load(cred_file)
return creds
def insert_data(conn, data):
# insert a new vendor into the vendors table
sql = """
INSERT INTO
air(datetime, temperature, humidity)
VALUES
(%s, %s, %s)
"""
try: try:
# read database configuration # open cursor on our db connection
params = config()
# connect to the PostgreSQL database
conn = psycopg2.connect(**params)
# create a new cursor
cur = conn.cursor() cur = conn.cursor()
# execute the INSERT statement # execute the INSERT statement
cur.execute(sql, (vendor_name,)) data = (data["datetime"], data["temperature"], data["humidity"])
# get the generated id back cur.execute(sql,data)
vendor_id = cur.fetchone()[0]
# commit the changes to the database # commit the changes to the database
conn.commit() conn.commit()
# close communication with the database # close communication with the database
cur.close() cur.close()
except (Exception, psycopg2.DatabaseError) as error: except (Exception, psycopg2.DatabaseError) as error:
print(error) print(error)
finally: finally:
if conn is not None: if conn is not None:
conn.close() conn.close()
return vendor_id if __name__ == "__main__":
data = {
"datetime": "2021-10-23 01:58:08.205911",
"temperature": "73.4",
"humidity": "49.2"
}
creds = get_db_creds("./.creds.json")
conn = connect_db(creds["db"], creds["host"], creds["user"], creds["passwd"])
insert_data(conn, data)