To Onboard any application or new schema to Delphix Masking! Key ingredient is to profile those elements using Delphix and share results with application team to identify with elements to mask without breaking their application.

This helps to onboard any application to Delphix Masking/Profiling within a couple of Minutes. Delphix DBA can Profiler [PII Scan Elements] for any Oracle database using Delphix API’s.

  1. Delphix’ s data discovery is that the profile expressions of a profile set are directly mapped to algorithms. The profile discovery data can then be used immediately for masking that data.
Delphix Profiler Job Creation

2. Tester/App Owners chained to Delphix DBAs to run the profiler and share the details to application owners

The Bigger Challenge: “That’s great for one environment, but we have thousands!”

  • Using Python we have built framework to get Delphix Profiler Results without help of Delphix DBA’s.
  • To achieve this process we are using Python and Community Edition Rundeck.
  • Once the Team reviewed, we can create masking jobs using CSV file.
  • Login on to automation server & run Python scripts as described below

Things to know: 

  1. Using Delphix API’s we have written python code to build this profiler framework in Delphix. 
  2. Python scripts will connect to Oracle database to fetch the schema tables and assign those thing Delphix Ruleset. 
  3. To Simplify running Python scripts on the server! We have created easy RUNABLE job in Rundeck with set of inputs. 
[oracle@ dpx_run_profiler]$ python3 onboard_inventory.py DATABASE t53dbadeedst1x 1521 SCHEMA DATABASE_INSTANCE PROFILER_NUMBER EMAIL
Running cx_Oracle version 8.2.1 built at Jun 1 2021 19:08:15
File: /usr/local/lib64/python3.6/site-packages/cx_Oracle.cpython-36m-x86_64-linux-gnu.so
Oracle Instant Client Version: 21.1.0.0.0
Masking engine credentials are correct
file creation successful
Successful Connected to Oracle Database: tedw1
SchemaTableList CSV file is created
connection creation successful
connID is: 927
/u1/DataOps/dpx_run_profiler/
SchemaTableList.csv
STG_DEEDS_PPP:AUDIT_AUDITEVENT table was successfully added:1384
STG_DEEDS_PPP:FGNDOCUMENT table was successfully added:1384
STG_DEEDS_PPP:FGNHISTORY table was successfully added:1384
STG_DEEDS_PPP:FGNHISTORY_BKP_0612 table was successfully added:1384
STG_DEEDS_PPP:FGNLOANACTIONS table was successfully added:1384
STG_DEEDS_PPP:FGNLOANSCHEDULE table was successfully added:1384
STG_DEEDS_PPP:FGNNOTE table was successfully added:1384
STG_DEEDS_PPP:FGNPACKAGE table was successfully added:1384
STG_DEEDS_PPP:FGNPACKAGEVIEW table was successfully added:1384
STG_DEEDS_PPP:FGNPACKAGE_BKP_0612 table was successfully added:1384
STG_DEEDS_PPP:FGNPPPDBA table was successfully added:1384
reading in current job ID: 1519
{'jobId': '1519'}
completed job ID: 1519
1519
{'executionId': 4832, 'jobId': 1519, 'status': 'RUNNING', 'startTime': '2021-09-07T14:00:21.087+0000'}
4832
curl file is created
Downloading the Profiler Result file
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 156k 100 156k 0 0 3787k 0 --:--:-- --:--:-- --:--:-- 3822k
Successful Downloaded the Profiler Result file & sent to Email Address
job-report_STG_DEEDS_PPP.pdf
Mail Sent
onboard_inventory.py

# Author : Sandeep Reddy Narani
# Created : 05/11/2020
# Purpose : Connects and Fetch data from Oracle database and Delphix Profiler to scan PII Elements.
############################################################################
import json
import sys
import csv
import api_functions
import keyring
import os
import time
import cx_Oracle
import sendemail

# Change according to OS version
os.chdir("E:\\DevOps\\dpx_run_profiler\\")
filePath = os.getcwd()+"\\"
username = 'admin'
config = open(filePath+'config.json',)
config_file = json.load(config)

for eng_config in config_file['engine']:
baseUrl = eng_config['http/https'] + '://' + eng_config['ip'] + ':' + eng_config['port'] + "/masking/api/"
execbaseUrl = eng_config['http/https'] + '://' + eng_config['ip'] + '/masking/monitorJobsDBCompleted.do?action=exportMaskingReport&reportType=P&executionId='

for user_config in config_file['user']:
#print (user_config)
password = keyring.set_password('DEuser', username, user_config['password'])

SESSION = api_functions.api_login(username, baseUrl)

#generate error/success log files and read in CSVs for connector/rule set creation
filePath, errorLog, successLog, jobMapLog, schemaTableList, connCSV = api_functions.file_load()

encKey = api_functions.load_key()

# For loop to read CSV
with open(filePath+connCSV, encoding='utf-8') as csvfile:
#keep track of what row we are on in connCSV. For error reporting services.
rowNum = 2

readCSV = csv.DictReader(csvfile, delimiter=',')
for row in readCSV:
#connectorName = str(row['connectorName'])
connectorName = sys.argv[1]
databaseType = str(row['databaseType'])
#databaseName = str(row['databaseName'])
databaseName = sys.argv[2]
#print(databaseName)
#environmentId = str(row['environmentId'])
environmentId = sys.argv[3]
#host = str(row['host'])
#port = str(row['port'])
host = sys.argv[4]
port = sys.argv[5]
#schemaName = str(row['schemaName'])
schemaName = sys.argv[6]
#sid = str(row['sid'])
sid = sys.argv[7]
connUsername = str(row['username'])
connPassword = row['password']
profileId = str(row['profileId'])
profileId = sys.argv[8]
#test = api_functions.decrypt(encKey, connectorName, connPassword)
orclDbconn = api_functions.loadoracle(filePath, encKey, host, port, schemaName, sid, connUsername, connPassword, databaseName)
#print ("Database Connection is:" +str(orclDbconn))
#runJobID=1484
#getIvn = api_functions.get_inventory(SESSION, runJobID, baseUrl, execbaseUrl,filePath, schemaName )
envName, rsName, prfName, mskName, connID, currSchemaName = api_functions.create_connector(encKey, SESSION, connectorName, databaseType, databaseName, environmentId, host, port, schemaName, sid, connUsername, connPassword, baseUrl)

print("connID is: "+str(connID))
if connID == "EMPTY":
errorLog.write("Cannot create connector line:" + str(rowNum) + " " + str(row['connectorName']) + "\n")
else:
successLog.write("Successful creation of connector:" + str(rowNum) + " " + str(row['connectorName']) + "\n")

rsID = api_functions.create_rule_set(SESSION, connID, rsName, baseUrl)

add_tables = api_functions.add_tables_to_rule_set(SESSION, currSchemaName, rsID, errorLog, filePath, schemaTableList, baseUrl)

prfJobID = api_functions.create_profile_job(SESSION, rsID, profileId, prfName, baseUrl)

print ("ProfileJobID is:" +str(prfJobID))

runJobID = api_functions.run_job(SESSION, prfJobID, baseUrl)
time.sleep(10)
runJobID=str(prfJobID)
getIvn = api_functions.get_inventory(SESSION, runJobID, baseUrl, execbaseUrl,filePath, schemaName )
time.sleep(10)
print("Downloading the Profiler Result file")
os.system('sh curl.sh')
print("Successful Downloaded the Profiler Result file & sent to Email Address")
dpdfname = "job-report_"+schemaName+".pdf"
rvemail = sys.argv[9]
sendemail.sendpdffile(dpdfname, rvemail)
#mskJobID = api_functions.create_mask_job(SESSION, rsID, mskName, baseUrl)

#jobMapLog.write("envID:"+ str(environmentId) + " envName: "+envName + " prfID: "+str(prfJobID)+" prfName: "+prfName+ " mskID: "+str(mskJobID)+ " mskName: "+mskName+"\n")

rowNum = rowNum + 1

errorLog.close()

successLog.close()

api_functions.py

# Author : Sandeep Reddy Narani
# Created : 05/11/2020
# Purpose : Dephix api functions.
############################################################################
import requests
import sys
import csv
import re
import keyring
import time
import cryptography
import openpyxl
import cx_Oracle
import os
from base64 import b64encode, b64decode

def load_key():
from cryptography.fernet import Fernet

csvKey = open("key.txt", "rb").read()
encKey = Fernet(csvKey)
return encKey

def file_load():
filePath = os.getcwd()+"/"
#create errorLog
errorLog = open(filePath+"errorLog.txt", "w")
#create successLog
successLog = open(filePath+"successLog.txt", "w")
#create jobMapLog
jobMapLog = open(filePath+"jobMapLog.txt", "w")
#read in schemaTableList
schemaTableList = "SchemaTableList.csv"
#schemaTableList = open(filePath+"SchemaTableList.csv","w")
#read in CSV list
#connCSV = sys.argv[1]
connCSV = "connCSV_output.csv"
print("file creation successful")
return filePath, errorLog, successLog, jobMapLog, schemaTableList, connCSV

def api_login(username, baseUrl):
# login into engine
#Initialize session
session = requests.session()
# login into Masking Engine
loginUrl = baseUrl + 'login'
LOGIN_DATA = {
'type': 'LoginRequest',
'username': username,
'password': keyring.get_password('DEuser', username)
}

postLogin = session.post(loginUrl, json=LOGIN_DATA)
#print (postLogin)
if postLogin.status_code != 200:
print('Masking engine credentials are incorrect. Exiting script.')
sys.exit()
# Add session to headers. Note: .json() method reads response as a dictionary.
session.headers.update(postLogin.json())
#auth = session.headers["Authorization"]
print('Masking engine credentials are correct')
return session

def create_connector(encKey, session, connectorName, databaseType, databaseName, environmentId, host, port, schemaName, sid, connUsername, connPassword, baseUrl):

connPwd = connPassword.encode('utf-8')

connPwdDec = encKey.decrypt(connPwd)
#print("pwd is now: "+connPwdDec.decode())
connUrl = baseUrl + 'database-connectors'

if databaseType == "MSSQL":
CONNECTOR_DATA = {
"connectorName": connectorName,
"databaseType": databaseType,
"databaseName": databaseName,
"environmentId": environmentId,
"host": host,
"port": port,
"schemaName": schemaName,
"username": connUsername,
"password": connPwdDec.decode()
}
elif databaseType == "ORACLE":
CONNECTOR_DATA = {
"connectorName": connectorName,
"databaseType": databaseType,
"environmentId": environmentId,
"host": host,
"password": connPwdDec.decode(),
"port": port,
"schemaName": schemaName,
"sid": sid,
"username": connUsername
}

#print("CONNECTOR_DATA = "+str(CONNECTOR_DATA))

# call API to create connector
postConn = session.post(connUrl, json=CONNECTOR_DATA)

# if connector creation successful, retrieve variables to use for other functions
if postConn.ok:
# get JSON string of connector creation, pull out connector ID and DB schema name to use for other functions
postConnData = postConn.json()
# connName = (postConnData['connectorName'])
connName = postConnData['connectorName']
connName = connName.strip('\"')
connName = str(connName)

# create ruleset and profile job names, based on connector name
insensitive_conn = re.compile(re.escape('conn'), re.IGNORECASE)

#rsName = connName.replace("conn", "RS")
rsName = insensitive_conn.sub('RS', connName)
#prfName = connName.replace("conn", "PRF")
prfName = insensitive_conn.sub('PRF', connName)

mskName = insensitive_conn.sub('MSK', connName)

# get schemaName from connector info to compare with schemas in rule set CSV
currSchemaName = (postConnData['schemaName'])

# get connector ID for rule set creation
connID = (postConnData['databaseConnectorId'])

# environment name for reporting
envURL = baseUrl + 'environments/' + str(environmentId)
getEnv = session.get(envURL)
getEnvDATA = getEnv.json()
envName = (getEnvDATA['environmentName'])

print("connection creation successful")

return envName, rsName, prfName, mskName, connID, currSchemaName
else:
# print("connector creation failed")
# set all values to EMPTY and attempt to create connector from next line in CSV
connID = "EMPTY"
rsName = "EMPTY"
prfName = "EMPTY"
currSchemaName = "EMPTY"
mskName = "EMPTY"
envName = "EMPTY"
# print("connID: "+connID)
print("connection creation unsuccessful")

return envName, rsName, prfName, mskName, connID, currSchemaName

def create_rule_set(session, connID, rsName, baseUrl):

RSUrl = baseUrl + 'database-rulesets'

#Json string to create a rule set
RS_DATA = {
"rulesetName": rsName,
"databaseConnectorId": str(connID)
}

# create rule set
postRS = session.post(RSUrl, json=RS_DATA)

#get resulting Json string, capture ruleSet ID to use for next step
postRSdata = postRS.json()
rsID = (postRSdata['databaseRulesetId'])
return rsID

# Load Oracle Instant Client
oh="c:/app/instantclient_19_8"
os.environ["ORACLE_HOME"]=oh
os.environ["PATH"]=oh+os.pathsep+os.environ["PATH"]
os.environ["NLS_LANG"]="AMERICAN_AMERICA.AL32UTF8"


#print("Running cx_Oracle version", cx_Oracle.version,"built at", cx_Oracle.buildtime)
#print("File:", cx_Oracle.__file__)
print("Oracle Instant Client Version:", ".".join(str(i) for i in cx_Oracle.clientversion()))

def loadoracle(filePath, encKey, host, port, schemaName, sid, connUsername, connPassword, databaseName):
connPwd = connPassword.encode('utf-8')
connPwdDec = encKey.decrypt(connPwd)
dbpassword = connPwdDec.decode()
#print (dbpassword)
#dsn = cx_Oracle.makedsn(host= host, port= port, sid= sid)
durl = host+":"+port+"/"+databaseName
#print (durl)
#connection = cx_Oracle.connect(user= connUsername, password= dbpassword, dsn=dsn)
connection = cx_Oracle.Connection(user= connUsername, password= dbpassword, dsn= durl)
sql = "SELECT owner as schemaName,object_name as tableName from dba_objects where object_type='TABLE' and owner= :1"
cursor = connection.cursor()
print ("Successful Connected to Oracle Database: " +str(sid))
f = open(filePath+"schemaTableList.csv","w")
writer = csv.writer(f, lineterminator="\n", quoting=csv.QUOTE_NONE, escapechar='\\')
cursor.execute(sql, (schemaName,))
#cursor.execute("SELECT owner as schemaName,object_name as tableName from dba_objects where object_type='TABLE' and owner='BSAAML_MART'")
col_names = [row[0] for row in cursor.description]
writer.writerow(col_names)
for row in cursor:
writer.writerow(row)
print ("SchemaTableList CSV file is created")
connection.close()

def add_tables_to_rule_set(session, currSchemaName, rsID, errorLog, filePath, schemaTableList, baseUrl):
print(filePath)
print(schemaTableList)

usedList = []

with open(filePath+schemaTableList) as schemafile:
readCSV = csv.DictReader(schemafile, delimiter=',')
for row in readCSV:
#lineNumber = lineNumber + 1
CSVSchemaName = row['SCHEMANAME']
tableName = row['TABLENAME']

#print("current rule set ID is: " + str(rsID) + "current schema is: " + currSchemaName + ". current schema in CSV is: " + CSVSchemaName)
if currSchemaName == CSVSchemaName:
#print("found match, adding tables to above mentioned schema")
tableUrl = baseUrl + 'table-metadata'

TABLE_DATA = {
"tableName": tableName,
"rulesetId": str(rsID)
}

postTbl = session.post(tableUrl, json=TABLE_DATA)

if postTbl.ok:
print(CSVSchemaName + ":" + tableName + " table was successfully added:" + str(rsID) )
usedList.append(str(rsID) + tableName)
else:
if (str(rsID) + tableName) in usedList:
print("this combination of rulesetId,tableName was already added: " + str(rsID) + "," + tableName)
else:
print("this combination of rulesetId,tableName is invalid " + currSchemaName + ", " + tableName)
errorLog.write("this combination of schema,tableName is invalid: " + currSchemaName + ", " + tableName+"\n")

def create_profile_job(session, rsID, profileId, prfName, baseUrl):

addPrfURL = baseUrl + 'profile-jobs'

PRF_DATA = {
"jobName": prfName,
"profileSetId": str(profileId),
"rulesetId": str(rsID),
"jobDescription": ""
}
postPRF = session.post(addPrfURL, json=PRF_DATA)

postPRFdata = postPRF.json()

prfJobID = (postPRFdata['profileJobId'])

return prfJobID

def create_mask_job(session, rsID, mskName, baseUrl):

addMskURL = baseUrl + 'masking-jobs'

MSK_DATA = {
"jobName": mskName,
"rulesetId": str(rsID),
"jobDescription": "",
"feedbackSize": "100000",
"maxMemory": "8192",
"minMemory": "8192",
"databaseMaskingOptions": {
"commitSize": "10000"
}
}

postMSK = session.post(addMskURL, json=MSK_DATA)

postMSKdata = postMSK.json()

mskJobID = (postMSKdata['maskingJobId'])

return mskJobID

def run_job(session,prfJobID, baseUrl):

runningJobCnt = 0

jobID = prfJobID
print("reading in current job ID: "+str(jobID))
JOB_DATA = {
"jobId": str(jobID)
}
prfUrl = baseUrl + "executions"
#print (JOB_DATA)
postJOB = session.post(prfUrl, json=JOB_DATA)
if postJOB.ok:

runningJobCnt = runningJobCnt + 1

while runningJobCnt > 0:
time.sleep(15)

postJOBdata = postJOB.json()
executionId = postJOBdata['executionId']

jobUri = baseUrl + "executions/" + str(executionId)
job = session.get(jobUri)
jobData = job.json()
status = jobData['status']
jobId = jobData['jobId']

if status == "RUNNING":
print("job is still running")
elif status == "SUCCEEDED":
print("completed job ID: " + str(jobId))
#successLog.write("completed job ID: " + str(jobId)+"\n")
runningJobCnt = runningJobCnt - 1
elif status == "FAILED":
print("Failure to run job ID" + str(jobId))
#errorLog.write("Failure to run job ID" + str(jobId)+"\n")
runningJobCnt = runningJobCnt - 1
else:
print("unknown job status")

return jobID

def get_inventory(session, runJobID, baseUrl, execbaseUrl, filePath, schemaName):
invURL = baseUrl + 'executions'
#print(runJobID)
#Json string to get execution Id using Jobid
EX_DATA = {
"jobId": str(runJobID)
}
#print (invURL)
Report = session.post(invURL, json=EX_DATA)
getExeRS = Report.json()
#print(getExeRS)
getExeID = (getExeRS['executionId'])
print (getExeID)
auth = session.headers["Authorization"]
exeIDurl = execbaseUrl +str(getExeID)
bcurl = "curl -o job-report_"+schemaName+".pdf -X GET --header 'Accept: application/json' --header 'Authorization: " + auth +"' " + repr(exeIDurl)
#print(bcurl)
with open(filePath+'curl.sh', 'w') as f:
original_stdout = sys.stdout
sys.stdout = f # Change the standard output to the file we created.
print (bcurl)
sys.stdout = original_stdout
print ("curl file is created")

sendemail.py

# Author : Sandeep Reddy Narani
# Created : 05/11/2020
# Purpose : Send Delphix Downloaded Profiler Result to Email.
############################################################################

from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import smtplib

def sendpdffile( dpdfname, rvemail):
body = '''Please find the Delphix Profiler Results Attachment. Once you review the PII Elements please contact Delphix-DBA-Team to create Masking Job
'''
# put your email here
sender = 'dbadelphix@deeds.com'
#password = 'xxxxxxxxxxxxxxxxxxxxxxxx'
# put the email of the receiver here
receiver = rvemail

#Setup the MIME
message = MIMEMultipart()
message['From'] = sender
message['To'] = receiver
message['Subject'] = 'Delphix Profiler PII Scan Results'
message.attach(MIMEText(body, 'plain'))

#pdfname = 'job-report.pdf'
#pdfname = filePath+dpdfname
pdfname = dpdfname
print (pdfname)
# open the file in bynary
binary_pdf = open(pdfname, 'rb')

payload = MIMEBase('application', 'octate-stream', Name=pdfname)
# payload = MIMEBase('application', 'pdf', Name=pdfname)
payload.set_payload((binary_pdf).read())

# enconding the binary into base64
encoders.encode_base64(payload)

# add header with pdf name
payload.add_header('Content-Decomposition', 'attachment', filename=pdfname)
message.attach(payload)

#use gmail with port
esession = smtplib.SMTP('mailhost.deeds.com', 25)
#print (session)

#enable security
#session.starttls()

#login with mail_id and password
#session.login(sender, password)

text = message.as_string()
esession.sendmail(sender, receiver, text)
esession.quit()
print('Mail Sent')

Leave a comment