I have a python script like below.
#!/usr/bin/env python
import sys
#Condition to specify exact number of arguments in the spark-submit command line
if len(sys.argv) != 8:
print "Invalid number of args......"
print "Usage: spark-submit file.py Input_path Output_path"
exit()
table=sys.argv[1]
hivedb=sys.argv[2]
domain=sys.argv[3]
port=sys.argv[4]
mysqldb=sys.argv[5]
username=sys.argv[6]
password=sys.argv[7]
lastval = sqlContext.sql("select nvl(max(id),0) as maxval from {}.{}".format(hivedb,table)).collect()[0].asDict()['maxval']
df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}_pru".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","(select * from {} where id > {}) as {}".format(table,lastval,table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load()
Now in this script. I am using the table name to findout what type of table is it.
Sample table names: ring_test, ring_history
I will do
type = table.split("_")[0]
If type=test then the mysqldb and hivedb to the above queries should be testing.
If type is not equal to test the pass history to mysqldb and hivedb.
How can I do that in python?
Aucun commentaire:
Enregistrer un commentaire