#! env/bin/python3 # import the psycopg2 database adapter for PostgreSQL from psycopg2 import connect, Error import json from psycopg2.extras import Json from psycopg2.extras import json as psycop_json import sys def connect_db(db: str,host: str,user: str,passwd: str): try: # declare a new PostgreSQL connection object conn = connect( dbname = db, user = user, host = host, password = passwd, # attempt to connect for 3 seconds then raise exception connect_timeout = 3 ) cur = conn.cursor() except (Exception, Error) as err: #print ("\npsycopg2 connect error:", err) #conn = None cur = None return cur # use Python's open() function to load the JSON data with open('postgres-records.json') as json_data: # use load() rather than loads() for JSON files record_list = json.load(json_data) # concatenate an SQL string sql_string = 'INSERT INTO {} '.format( table_name ) # if record list then get column names from first key if type(record_list) == list: first_record = record_list[0] columns = list(first_record.keys()) print ("\ncolumn names:", columns) # if just one dict obj or nested JSON dict else: print ("Needs to be an array of JSON objects") sys.exit() # enclose the column names within parenthesis sql_string += "(" + ', '.join(columns) + ")\nVALUES " # only attempt to execute SQL if cursor is valid if cur != None: try: cur.execute( sql_string ) conn.commit() print ('\nfinished INSERT INTO execution') except (Exception, Error) as error: print("\nexecute_sql() error:", error) conn.rollback() # close the cursor and connection cur.close() conn.close() ######################################################3 import psycopg2 from config import config def insert_vendor(vendor_name): """ insert a new vendor into the vendors table """ sql = """INSERT INTO vendors(vendor_name) VALUES(%s) RETURNING vendor_id;""" conn = None vendor_id = None try: # read database configuration params = config() # connect to the PostgreSQL database conn = psycopg2.connect(**params) # create a new cursor cur = conn.cursor() # execute the INSERT statement cur.execute(sql, (vendor_name,)) # get the generated id back vendor_id = cur.fetchone()[0] # commit the changes to the database conn.commit() # close communication with the database cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() return vendor_id