import os
from impala.dbapi import connect
import pandas as pd
def db_close(cur, conn):
cur.close()
conn.close()
def nvl(v):
return 0 if v is None else v
# 현재 작업 디렉토리 출력
print(os.getcwd())
conn = connect(host='192.168.91.156', port=21050)
cursor = conn.cursor()
# fetch schema list
schemaDf = pd.read_sql("SHOW SCHEMAS", conn)
# 테이블 리스트 저장
tableDf = pd.DataFrame(data=None, index=None, columns=None, dtype=None, copy=False)
for i in range(len(schemaDf)):
schemaName = schemaDf.loc[i, "name"]
tmpTableDf = pd.read_sql("SHOW TABLES IN " + schemaName, conn)
# 동일값으로 dbName 리스트 생성
schemaNameList = [schemaName for i in range(len(tmpTableDf))]
tmpTableDf.loc[:, "SCHEMA"] = schemaNameList
# 컬럼 순서 변경
tmpTableDf = tmpTableDf[["SCHEMA", "name"]]
# 컬럼이름 바꾸기
tmpTableDf = tmpTableDf.rename(columns={'name': 'TAB_ID'})
tableDf = tableDf.append(tmpTableDf, ignore_index=True)
# 합성컬럼 생성
tableDf["ID"] = tableDf["SCHEMA"] + "." + tableDf["TAB_ID"]
# 컬럼 순서 변경
tableDf = tableDf[["ID", "SCHEMA", "TAB_ID"]]
# 파이참에서 실행시 에러
# tabledf[tabledf["OWNER"] == "DM"]
# print("test",tabledf.loc[:,"OWNER"] )
# 컬럼 리스트 저장
colList01 = []
colList02 = []
colList03 = []
colList04 = []
colList05 = []
colList06 = []
colList07 = []
colList08 = []
colList09 = []
colList10 = []
colList11 = []
# job_date 추가 필요
for i in range(0,500):
tableId = tableDf.loc[i, "ID"]
df = pd.read_sql("DESCRIBE EXTENDED " + tableId, conn)
# 테이블 코멘트 조회
# Table Type: 이 다르다.
# DESCRIBE EXTENDED dm.anl_dd_mem_prfl_i
# DESCRIBE EXTENDED ccip.anl_new_cst_lst
df1 = df[df["type"].copy() == "comment "]
tmpTblList = [tableId for i in range(len(df1))]
df1.loc[:, "ID"] = tmpTblList
colList01.append(df1)
df2 = df[df["name"].copy() == "Table Type: "]
tmpTblList = [tableId for i in range(len(df2))]
df2.loc[:, "ID"] = tmpTblList
colList02.append(df2)
df3 = df[df["type"].copy() == "totalSize "]
tmpTblList = [tableId for i in range(len(df3))]
df3.loc[:, "ID"] = tmpTblList
colList03.append(df3)
df4 = df[df["name"].copy() == "InputFormat: "]
tmpTblList = [tableId for i in range(len(df4))]
df4.loc[:, "ID"] = tmpTblList
colList04.append(df4)
df5 = df[df["name"].copy() == "# Partition Information"]
tmpTblList = [tableId for i in range(len(df5))]
df5.loc[:, "ID"] = tmpTblList
colList05.append(df5)
if (df["name"] == "inst_dtm").any():
df6 = df[df["name"].copy() == "inst_dtm"]
tmpTblList = [tableId for i in range(len(df6))]
df6.loc[:, "ID"] = tmpTblList
colList06.append(df6)
if (df["name"] == "pfmc_yymmdd").any():
df7 = df[df["name"].copy() == "pfmc_yymmdd"]
tmpTblList = [tableId for i in range(len(df7))]
df7.loc[:, "ID"] = tmpTblList
colList07.append(df7)
if (df["name"] == "pfmc_yymm").any():
df8 = df[df["name"].copy() == "pfmc_yymm"]
tmpTblList = [tableId for i in range(len(df8))]
df8.loc[:, "ID"] = tmpTblList
colList08.append(df8)
if (df["name"] == "std_date").any():
df9 = df[df["name"].copy() == "std_date"]
tmpTblList = [tableId for i in range(len(df9))]
df9.loc[:, "ID"] = tmpTblList
colList09.append(df9)
if (df["name"] == "create_date").any():
df10 = df[df["name"].copy() == "create_date"]
tmpTblList = [tableId for i in range(len(df10))]
df10.loc[:, "ID"] = tmpTblList
colList10.append(df10)
if (df["name"] == "std_yymm").any():
df11 = df[df["name"].copy() == "std_yymm"]
tmpTblList = [tableId for i in range(len(df11))]
df11.loc[:, "ID"] = tmpTblList
colList11.append(df11)
# 컬럼 리스트 저장
for i in range(500,len(tableDf)):
tableId = tableDf.loc[i, "ID"]
df = pd.read_sql("DESCRIBE EXTENDED " + tableId, conn)
# 테이블 코멘트 조회
df1 = df[df["type"].copy() == "comment "]
tmpTblList = [tableId for i in range(len(df1))]
df1.loc[:, "ID"] = tmpTblList
colList01.append(df1)
df2 = df[df["name"].copy() == "Table Type: "]
tmpTblList = [tableId for i in range(len(df2))]
df2.loc[:, "ID"] = tmpTblList
colList02.append(df2)
df3 = df[df["type"].copy() == "totalSize "]
tmpTblList = [tableId for i in range(len(df3))]
df3.loc[:, "ID"] = tmpTblList
colList03.append(df3)
df4 = df[df["name"].copy() == "InputFormat: "]
tmpTblList = [tableId for i in range(len(df4))]
df4.loc[:, "ID"] = tmpTblList
colList04.append(df4)
df5 = df[df["name"].copy() == "# Partition Information"]
tmpTblList = [tableId for i in range(len(df5))]
df5.loc[:, "ID"] = tmpTblList
colList05.append(df5)
if (df["name"] == "inst_dtm").any():
df6 = df[df["name"].copy() == "inst_dtm"]
tmpTblList = [tableId for i in range(len(df6))]
df6.loc[:, "ID"] = tmpTblList
colList06.append(df6)
if (df["name"] == "pfmc_yymmdd").any():
df7 = df[df["name"].copy() == "pfmc_yymmdd"]
tmpTblList = [tableId for i in range(len(df7))]
df7.loc[:, "ID"] = tmpTblList
colList07.append(df7)
if (df["name"] == "pfmc_yymm").any():
df8 = df[df["name"].copy() == "pfmc_yymm"]
tmpTblList = [tableId for i in range(len(df8))]
df8.loc[:, "ID"] = tmpTblList
colList08.append(df8)
if (df["name"] == "std_date").any():
df9 = df[df["name"].copy() == "std_date"]
tmpTblList = [tableId for i in range(len(df9))]
df9.loc[:, "ID"] = tmpTblList
colList09.append(df9)
if (df["name"] == "create_date").any():
df10 = df[df["name"].copy() == "create_date"]
tmpTblList = [tableId for i in range(len(df10))]
df10.loc[:, "ID"] = tmpTblList
colList10.append(df10)
if (df["name"] == "std_yymm").any():
df11 = df[df["name"].copy() == "std_yymm"]
tmpTblList = [tableId for i in range(len(df11))]
df11.loc[:, "ID"] = tmpTblList
colList11.append(df11)
db_close(cursor, conn)
# df.append 사용시 성능 매우 저하됨. 사용 하지 말것
tableCommentDf = pd.concat(colList01)
objectTypeDf = pd.concat(colList02)
tableSizeDf = pd.concat(colList03)
tableFormatDf = pd.concat(colList04)
tablePartitionDf = pd.concat(colList05)
tableInstDtmDf = pd.concat(colList06)
tablePfmcYymmddDf = pd.concat(colList07)
tablePfmcYymmDf = pd.concat(colList08)
tableStdDateDf = pd.concat(colList09)
tableCreateDateDf = pd.concat(colList10)
tableStdYymmDf = pd.concat(colList11)
# 컬럼이름 바꾸기
tableCommentDf = tableCommentDf.rename(columns={'name': 'COL_ID', 'type': 'TYPE', 'comment': 'tableComment'})
objectTypeDf = objectTypeDf.rename(columns={'name': 'COL_ID', 'type': 'tableTYPE', 'comment': 'Comments'})
tableSizeDf = tableSizeDf.rename(columns={'type': 'tableSize', 'comment': 'tableSizeValue'})
tableFormatDf = tableFormatDf.rename(columns={'type': 'tableFormatValue'})
tablePartitionDf = tablePartitionDf.rename(columns={'name': 'tablePartition'})
tableInstDtmDf = tableInstDtmDf.rename(columns={'name': 'tableInstDtm'})
tablePfmcYymmddDf = tablePfmcYymmddDf.rename(columns={'name': 'tablePfmcYymmdd'})
tablePfmcYymmDf = tablePfmcYymmDf.rename(columns={'name': 'tablePfmcYymm'})
tableStdDateDf = tableStdDateDf.rename(columns={'name': 'tableStdDate'})
tableCreateDateDf = tableCreateDateDf.rename(columns={'name': 'tableCreateDate'})
tableStdYymmDf = tableStdYymmDf.rename(columns={'name': 'tableStdYymm'})
# left join
resultDf = pd.merge(left=tableDf, right=tableCommentDf, how="left", on="ID")
resultDf02 = pd.merge(left=resultDf, right=objectTypeDf, how="left", on="ID")
resultDf03 = pd.merge(left=resultDf02, right=tableSizeDf, how="left", on="ID")
resultDf04 = pd.merge(left=resultDf03, right=tableFormatDf, how="left", on="ID")
resultDf05 = pd.merge(left=resultDf04, right=tablePartitionDf, how="left", on="ID")
resultDf06 = pd.merge(left=resultDf05, right=tableInstDtmDf, how="left", on="ID")
resultDf07 = pd.merge(left=resultDf06, right=tablePfmcYymmddDf, how="left", on="ID")
resultDf08 = pd.merge(left=resultDf07, right=tablePfmcYymmDf, how="left", on="ID")
resultDf09 = pd.merge(left=resultDf08, right=tableStdDateDf, how="left", on="ID")
resultDf10 = pd.merge(left=resultDf09, right=tableCreateDateDf, how="left", on="ID")
resultDf11 = pd.merge(left=resultDf10, right=tableStdYymmDf, how="left", on="ID")
# KeyError: "['OWNER', 'TYPE', 'Comments'] not in index"
# print(resultDf06.head() )
resultDf11 = resultDf11[["ID", "SCHEMA", "TAB_ID", "tableFormatValue", "tableTYPE", "tableComment", "tableSizeValue", "tablePartition"
, "tableInstDtm", "tablePfmcYymmdd", "tablePfmcYymm", "tableStdDate", "tableCreateDate", "tableStdYymm" ]]
for i in range(0,len(resultDf11)):
tmp = resultDf11.loc[i, ["tableInstDtm","tablePfmcYymmdd","tablePfmcYymm", "tableStdDate", "tableCreateDate", "tableStdYymm" ]]
# print(len(nvl(resultDf11.loc[i,"tableInstDtm"])))
print(tmp)
# print(len(tmp))
if tmp == None:
print("aaaaaa")
if tmp == "nan":
print("bbbb")
if tmp is None:
print("ccc")
# if tmp.isna():
# print("is nan")
# else:
# print("else")
# if tmp == "nan":
# print("this is nan a")
# if len(tmp)
# print("this is true")
# print(len(nvl(resultDf11.loc[i,"tableInstDtm"])) + len(nvl(resultDf11.loc[i,"tablePfmcYymmdd"])) + len(nvl(resultDf11.loc[i,"tablePfmcYymm"])) + len(nvl(resultDf11.loc[i,"tableStdDate"])) + len(nvl(resultDf11.loc[i,"tableCreateDate"])) + len(nvl(resultDf11.loc[i,"tableStdYymm"])))
# resultDf11.loc[i, "NaN_cnt"] = len(nvl(resultDf11.loc[i,"tableInstDtm"])) + len(nvl(resultDf11.loc[i,"tablePfmcYymmdd"])) + len(nvl(resultDf11.loc[i,"tablePfmcYymm"])) + len(nvl(resultDf11.loc[i,"tableStdDate"])) + len(nvl(resultDf11.loc[i,"tableCreateDate"])) + len(nvl(resultDf11.loc[i,"tableStdYymm"]))
# resultDf11['NaN_cnt'] = resultDf11["tableInstDtm"].isnull().sum(1) + resultDf11["tablePfmcYymmdd"].isnull().sum(1)+ resultDf11["tablePfmcYymm"].isnull().sum(1) + resultDf11["tableStdDate"].isnull().sum(1)+ resultDf11["tableCreateDate"].isnull().sum(1) + resultDf11["tableStdYymm"].isnull().sum(1)
# 데이터 프레임 값 변경
resultDf11["tableFormatValue"] = resultDf11["tableFormatValue"].replace(["org.apache.hadoop.mapred.TextInputFormat"
, "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
, "org.apache.kudu.mapreduce.KuduTableInputFormat"
, "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"]
, ["Text", "Parquet", "Kudu", "Avro"])
# view object table type NULL 타입 제거
resultDf11["tableFormatValue"] = resultDf11["tableFormatValue"].replace(["null "],[""])
print(resultDf11.columns)
# 엑셀로 저장
resultDf11.to_excel(excel_writer='Z:/20.Dev/01.Python/02.download/HadooptableList.xlsx')
# display(resultDf02)
print("end")