ETL for ObservePoint API Journey Rules

Create the table in our PostgreSQL database:

CREATE TABLE public.rules
(
    fromtemplate BOOLEAN,
    rulename text COLLATE pg_catalog."default",
    checktimes INTEGER,
    accountid INTEGER,
    pagefilters text COLLATE pg_catalog."default",
    ruleid INTEGER NOT NULL,
    createdby INTEGER,
    matchallfilters BOOLEAN,
    createdat TIMESTAMP WITHOUT TIME zone,
    updatedat TIMESTAMP WITHOUT TIME zone,
    tags text COLLATE pg_catalog."default",
    labels text COLLATE pg_catalog."default",
    recepients text COLLATE pg_catalog."default",
    CONSTRAINT rules_pkey PRIMARY KEY (ruleid)
)

Extract the data from the API and load it into the PostgreSQL DB:

def loadRules(c):
    print(':loadRules() start')


    apiconn = http.client.HTTPSConnection("api.observepoint.com")
    payload = "{}"
    headers = { 'authorization': "api_key yourapikey" }
   
    endpoint = "/v2/rules"
    apiconn.request("GET", endpoint, payload, headers)
    res = apiconn.getresponse()
    data = res.read()
    de_data = data.decode("utf-8")
    jdata = json.loads(de_data)

    for rownum, drow in enumerate(jdata, 1):

        #create an empty list of full values since calls return partial values
        insertvaluesnames = ['fromTemplate','name','updatedAt','tags','recepients','checkTimes','accountId','pageFilters','id','labels','createdAt','createdBy','matchAllFilters']
        #insertvaluesnames = ['tags','id']
        insertvalues = [None] * len(insertvaluesnames)
       
        #dtlist is for dates
        dtlist=['createdAt','updatedAt']
        #dumplist is for dict
        dumplist=['tags','recepients','pageFilters','labels']
       
        for dfield in drow:
            #print(dfield + ':', end='')
            #print(drow.get(dfield))
           
            for insert_list_index,currentvalue in enumerate(insertvaluesnames):
                if dfield == currentvalue:
                    if dfield in dtlist:
                        insertvalues[insert_list_index] = drow.get(dfield)[0:-10]
                    elif dfield in dumplist:
                        insertvalues[insert_list_index] = json.dumps(drow.get(dfield),sort_keys=True)
                    else:
                        insertvalues[insert_list_index] = drow.get(dfield)
               
             
        queryFunc.insertRule(c,insertvalues)
       
        #if rownum == 1:
        #    break
       
    print(':loadRules() end')

Using this function to insert the row

def insertRule(c,a):
    """insertRule()START  """
    try:
        sql = """
        INSERT INTO rules (fromtemplate, rulename, updatedat, tags, recepients,
                                checktimes, accountid, pagefilters, ruleid,
                                labels, createdat, createdby, matchallfilters)
             VALUES(%s, %s, %s, %s, %s,
                                %s, %s, %s, %s,
                                %s, %s, %s, %s)
             """

        # create a cursor
        cur = c.cursor()
        cur.execute(sql, a)
        c.commit()
        #print('inserted action')
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)

Leave a Reply

Your email address will not be published. Required fields are marked *