The Delphix Masking Engine now features an interactive API client that can generate commands specific to your masking engine. With those
commands

  1. Identify which jobs need to run and keep those Jobs in jobIDs.txt file
  2. Running Jobs
    1. $ python3 run_jobs.py jobIDs.txt
      reading in current job ID: 1334
      found an ID: 1334, going to attempt to run job with this ID.
      completed job ID: 1334
import requests
import json
import sys
import time
import keyring
config = open('config.json',)
config_file = json.load(config)
for eng_config in config_file['engine']:
    baseUrl = eng_config['http/https'] + '://' + eng_config['ip'] + ':' + eng_config['port'] + "/masking/api/"
def api_login(username):
    # login into engine
    #Initialize session
    session = requests.session()
    # login into Masking Engine
    loginUrl = baseUrl + 'login'
    LOGIN_DATA = {
        'type': 'LoginRequest',
        'username': username,
        'password': password
        #'password': keyring.get_password('DEuser', username)
    }
    postLogin = session.post(loginUrl, json=LOGIN_DATA)
    if postLogin.status_code != 200:
        print('Masking engine credentials are incorrect. Exiting script.')
        sys.exit()
    # Add session to headers. Note: .json() method reads response as a dictionary.
    session.headers.update(postLogin.json())
    return session
user = sys.argv[1]
#password = sys.argv[2]
for user_config in config_file['user']:
    #password = user_config['password']
    password = keyring.set_password('DEuser', user, user_config['password'])
#filePath = sys.argv[3]
filePath = sys.argv[2]
jobIDfileName = sys.argv[3]
#create errorLog
errorLog = open(filePath+"run_jobs_errorLog.txt", "w")
#create successLog
successLog = open(filePath+"run_jobs_successLog.txt", "w")
# Pass in username and password that was prompted into the api_login method
SESSION = api_login(user)
# Create data dictionary for all engine job metadata
jobDict = {}
envDict = {}
jobDictTest = {}
jobIdList = []
#get environment details and create data dictionary to house all masking/profile jobs
envURL = baseUrl + "environments"
envIdList = []
getEnvInfo = SESSION.get(envURL)
getEnvInfoData = getEnvInfo.json()
def get_env_IDs():
    #gather list of environment IDs in order to retrieve job metadata
    for envs in getEnvInfoData['responseList']:
        envIdList.append(envs['environmentId'])
        envDict[envs['environmentName']] = envs['environmentId']
    #print(envDict)
    return envIdList, envDict
def get_job_metadata():
    for envName in envDict:
        #print("name is: "+str(envDict.get(envName)))
        pageNum = 1
        prfURL = baseUrl + "profile-jobs?page_number=" + str(pageNum) + "&page_size=1000&environment_id=" + str(envDict.get(envName))
        # print(prfURL)
        getPrf = SESSION.get(prfURL)
        while getPrf.ok:
            getPrfData = getPrf.json()
            for jobs in getPrfData['responseList']:
                jobID = jobs['profileJobId']
                jobDictName = jobs['jobName']
                jobKey = envName+"|"+jobDictName
                jobIdList.append(jobID)
                jobDictTest[jobKey] = jobID
            # print(getPrfData)
            pageNum += 1
            # print("pageNum is: " + str(pageNum))
            prfURL = baseUrl + "profile-jobs?page_number=" + str(pageNum) + "&page_size=1000&environment_id=" + str(envDict.get(envName))
            getPrf = SESSION.get(prfURL)
        ##############################################################################################################
        pageNum = 1
        mskURL = baseUrl + "masking-jobs?page_number=" + str(pageNum) + "&page_size=1000&environment_id=" + str(envDict.get(envName))
        # print(prfURL)
        getMsk = SESSION.get(mskURL)
        while getMsk.ok:
            getMskData = getMsk.json()
            for jobs in getMskData['responseList']:
                jobID = jobs['maskingJobId']
                jobDictName = jobs['jobName']
                #jobDict[jobID] = jobName, ID
                jobKey = envName + "|" + jobDictName
                jobIdList.append(jobID)
                jobDictTest[jobKey] = jobID
            # print(getPrfData)
            pageNum += 1
            # print("pageNum is: " + str(pageNum))
            mskURL = baseUrl + "masking-jobs?page_number=" + str(pageNum) + "&page_size=1000&environment_id=" + str(envDict.get(envName))
            getMsk = SESSION.get(mskURL)
    #print("jobIdList: "+str(jobIdList))
def run_job(fileName):
    runningJobCnt = 0
    with open(fileName, "r") as jobIdFile:
        for line in jobIdFile:
            jobID = line.strip()
            print("reading in current job ID: "+str(jobID))
            if jobID.isnumeric():
                print("found an ID: "+jobID+", going to attempt to run job with this ID.")
                JOB_DATA = {
                    "jobId": str(jobID)
                }
                prfUrl = baseUrl + "executions"
                postJOB = SESSION.post(prfUrl, json=JOB_DATA)
                if postJOB.ok:
                    runningJobCnt = runningJobCnt + 1
                    while runningJobCnt > 0:
                        time.sleep(15)
                        postJOBdata = postJOB.json()
                        executionId = postJOBdata['executionId']
                        jobUri = baseUrl + "executions/" + str(executionId)
                        job = SESSION.get(jobUri)
                        jobData = job.json()
                        status = jobData['status']
                        jobId = jobData['jobId']
                        if status == "RUNNING":
                            print("job is still running")
                        elif status == "SUCCEEDED":
                            print("completed job ID: " + str(jobId))
                            successLog.write("completed job ID: " + str(jobId)+"\n")
                            runningJobCnt = runningJobCnt - 1
                        elif status == "FAILED":
                            print("Failure to run job ID" + str(jobId))
                            errorLog.write("Failure to run job ID" + str(jobId)+"\n")
                            runningJobCnt = runningJobCnt - 1
                        else:
                            print("unknown job status")
                else:
                    print("ID:"+jobID+" is not found, moving to the next" )
                    errorLog.write("ID:"+jobID+" was not found." + "\n")
            else:
                if jobID.find('|') != -1:
                    jobName = jobID.partition('|')[2]
                    envName = jobID.partition('|')[0]
                    currJob = envName+"|"+jobName
                    print("currJob = " +currJob)
                    execID = ''
                    execID = jobDictTest.get(currJob)
                    print("execId = "+str(execID))
                    if execID != '':
                        JOB_DATA = {
                        "jobId": execID
                        }
                        #print("execID is: "+execID)
                        prfUrl = baseUrl + "executions"
                        postJOB = SESSION.post(prfUrl, json=JOB_DATA)
                        if postJOB.ok:
                            runningJobCnt = runningJobCnt + 1
                            while runningJobCnt > 0:
                                time.sleep(5)
                                postJOBdata = postJOB.json()
                                executionId = postJOBdata['executionId']
                                jobUri = baseUrl + "executions/" + str(executionId)
                                job = SESSION.get(jobUri)
                                jobData = job.json()
                                status = jobData['status']
                                jobId = jobData['jobId']
                                if status == "RUNNING":
                                    print("job is still running")
                                elif status == "SUCCEEDED":
                                    print("completed job ID: " + str(jobID))
                                    successLog.write("completed job ID: " + str(jobID)+"\n")
                                    runningJobCnt = runningJobCnt - 1
                                    api_login(user)
                                elif status == "FAILED":
                                    print("Failure to run job ID" + str(jobID))
                                    errorLog.write("Failure to run job ID" + str(jobID)+"\n")
                                    runningJobCnt = runningJobCnt - 1
                                else:
                                    print("unknown job status")
                                    api_login(user)
                    else:
                        print("job name: " + jobID + " not found")
                        errorLog.write("job name: " + jobID + " was not found." + "\n")
                else:
                    print("job name: "+jobID+" was either not formatted properly or not found in valid job list.")
                    errorLog.write("job name: "+jobID+" was not formatted properly."+"\n")
get_env_IDs()
get_job_metadata()
run_job(filePath+jobIDfileName)
errorLog.close()
successLog.close()

Leave a comment