# 하둡 디렉토리내, 파일 목록을 모두 출력한다.
import os
import pandas as pd
import pymysql
from tqdm import tqdm
# 공통 함수 영역 시작
def db_close(cur, conn):
cur.close()
conn.close()
# 공통 함수 영역 종료
# 하둡내 테이블 정보를 가져와서 데이터 프레임에 저장
conn = pymysql.connect(host="192.168.91.158", user="datacenter", password="68ta19ghu6", db="hive", charset="utf8")
cur = conn.cursor()
sql = "USE hive"
cur.execute(sql)
# fetch hue coordinator info
sql = '''
SELECT concat(A.DB_NAME, '.', MST.TBL_NAME) AS ID
, A.DB_NAME
, MST.TBL_NAME
, C.INPUT_FORMAT
, MST.TBL_TYPE
, B.TBL_COMMENT
, '' AS USER_TBL_COMMENTS /* 사용자가 정리하는 코멘트 */
, B.NUMROWS
, B.TOTALSIZE
, B.TOTALSIZE_MB
, D.PKEY_NAME
, B.TRANSIENT_LASTDDLTIME
, B.DO_NOT_UPDATE_STATS
, B.EXTERNAL_VALUE
, B.STATS_GENERATED_VIA_STATS_TASK
, FROM_UNIXTIME(MST.CREATE_TIME) AS TBL_CREATE_TIME
/* 참고 정보 */
, MST.DB_ID
, A.DB_OWNER_NAME
, A.DB_OWNER_TYPE
, A.DB_LOCATION_URI
, MST.TBL_ID
, MST.OWNER
, MST.SD_ID /* TBL_ID, CD_ID 사이의 맵핑 테이블 */
, MST.VIEW_EXPANDED_TEXT /* 뷰생성 확장 쿼리 */
, MST.VIEW_ORIGINAL_TEXT /* 뷰생성 기본 쿼리 */
, C.CD_ID
, C.OUTPUT_FORMAT
, C.LOCATION
, C.SERDE_ID
, C.NUM_BUCKETS
, C.IS_COMPRESSED /* 전체 0 */
, C.IS_STOREDASSUBDIRECTORIES /* 전체 0 */
/* 쿼리생성 */
, CASE WHEN MST.TBL_TYPE = 'VIRTUAL_VIEW' THEN ''
ELSE concat('compute stats ', A.DB_NAME, '.', MST.TBL_NAME, ';')
END AS compute_qry
FROM hive.TBLS MST
LEFT OUTER JOIN (
/* 임팔라 DB 목록 */
SELECT DB_ID
, NAME AS DB_NAME
, OWNER_NAME AS DB_OWNER_NAME
, OWNER_TYPE AS DB_OWNER_TYPE
, DB_LOCATION_URI
FROM hive.DBS
) A
ON MST.DB_ID = A.DB_ID
LEFT OUTER JOIN (
/* 임팔라 테이블 부가정보 - 코멘트,건수,사이즈,최종수정일시 등 */
SELECT TBL_ID
, MAX(CASE WHEN PARAM_KEY = 'comment' THEN PARAM_VALUE END) AS TBL_COMMENT
, MAX(CASE WHEN PARAM_KEY = 'numRows' THEN PARAM_VALUE END) AS NUMROWS
, MAX(CASE WHEN PARAM_KEY = 'totalSize' THEN PARAM_VALUE END) AS TOTALSIZE
, ROUND(MAX(CASE WHEN PARAM_KEY = 'totalSize' THEN PARAM_VALUE END)/1024/1024) AS TOTALSIZE_MB
, MAX(CASE WHEN PARAM_KEY = 'transient_lastDdlTime' THEN FROM_UNIXTIME(PARAM_VALUE) END) AS TRANSIENT_LASTDDLTIME
, MAX(CASE WHEN PARAM_KEY = 'DO_NOT_UPDATE_STATS' THEN PARAM_VALUE END) AS DO_NOT_UPDATE_STATS
, MAX(CASE WHEN PARAM_KEY = 'EXTERNAL' THEN PARAM_VALUE END) AS EXTERNAL_VALUE
, MAX(CASE WHEN PARAM_KEY = 'STATS_GENERATED_VIA_STATS_TASK' THEN PARAM_VALUE END) AS STATS_GENERATED_VIA_STATS_TASK
FROM hive.TABLE_PARAMS
GROUP BY TBL_ID
) B
ON MST.TBL_ID = B.TBL_ID
LEFT OUTER JOIN (
/* kudu 타입일 경우 파티션 미존재 */
/* 테이블의 저장 포맷조회 - input,output은 현재 모두 동일 */
SELECT SD_ID /* TBLS의 SD_ID 와 맵핑 */
, CD_ID /* 컬럼정보의 CD_ID 와 맵핑 */
, CASE WHEN INPUT_FORMAT LIKE '%Avro%' THEN 'Avro'
WHEN INPUT_FORMAT LIKE '%Parquet%' THEN 'Parquet'
WHEN INPUT_FORMAT LIKE '%Text%' THEN 'Text'
WHEN INPUT_FORMAT LIKE '%Kudu%' THEN 'Kudu'
ELSE 'na'
END AS INPUT_FORMAT
, CASE WHEN OUTPUT_FORMAT LIKE '%Avro%' THEN 'Avro'
WHEN OUTPUT_FORMAT LIKE '%Parquet%' THEN 'Parquet'
WHEN OUTPUT_FORMAT LIKE '%Text%' THEN 'Text'
WHEN OUTPUT_FORMAT LIKE '%Kudu%' THEN 'Kudu'
ELSE 'na'
END AS OUTPUT_FORMAT
, LOCATION
, SERDE_ID
, NUM_BUCKETS
, IS_COMPRESSED /* 전체 0 */
, IS_STOREDASSUBDIRECTORIES /* 전체 0 */
FROM hive.SDS
) C
ON MST.SD_ID = C.SD_ID
LEFT OUTER JOIN (
/* 테이블 내 파티션 키 정보 - 파티션컬럼이 복수개일경우, 쉼표로 구분 */
SELECT TBL_ID
, GROUP_CONCAT(PKEY_NAME SEPARATOR ',') AS PKEY_NAME
, CASE WHEN LENGTH( GROUP_CONCAT(PKEY_COMMENT SEPARATOR ',') ) = 1 THEN ''
ELSE GROUP_CONCAT(PKEY_COMMENT SEPARATOR ',')
END AS PKEY_COMMENT
, CASE WHEN LENGTH( GROUP_CONCAT(PKEY_TYPE SEPARATOR ',') ) = 1 THEN ''
ELSE GROUP_CONCAT(PKEY_TYPE SEPARATOR ',')
END AS PKEY_TYPE
FROM hive.PARTITION_KEYS
GROUP BY TBL_ID
) D
ON MST.TBL_ID = D.TBL_ID
WHERE 1=1
ORDER BY CASE WHEN DB_NAME LIKE 'temp_%' THEN 98
WHEN DB_NAME = 'test' THEN 99
ELSE 0
END
, A.DB_NAME
, MST.TBL_NAME
'''
dbTabledf = pd.read_sql(sql, conn)
dir_path = r"Z:\01.현행시스템분석\05.하둡\03.hue\source_20211230"
# 데이터프레임 dir:파일경로, fileName:파일명
tabledf = pd.DataFrame(data=None, index=None, columns=["absolutePath", "dir", "fileName"], dtype=None, copy=False)
# 지정된 디렉토리 내 파일 리스트 가져와 데이터 프레임에 저장
for (root, directories, files) in os.walk(dir_path):
for file in files:
# .sql.crc 와 같은 파일내 포함 관계를 뽑아낸다. 확장자가 .sql만 뽑아 내지 않는다.
if file.endswith(".sql") | file.endswith(".SQL"):
file_path = os.path.join(root, file)
tempDict = {}
tempDict["dir"] = [root]
tempDict["fileName"] = [file]
tempDict["absolutePath"] = [file_path]
tabledf = tabledf.append(tempDict, ignore_index=True)
# 절대경로
# print(file_path)
# 파일명만 출력
# print(file)
# 디렉토리만 출력
# print(root)
# 원천 소스 테이블 저장
sourceTabledf = pd.DataFrame(data=None, index=None, columns=["AbsolutePath", "DirName", "FileName","INPUT_FORMAT","TBL_TYPE","DB_NAME","TBL_NAME","TBL_COMMENT" ], dtype=None, copy=False)
targetTabledf = pd.DataFrame(data=None, index=None, columns=["AbsolutePath", "TGT_TBL_NAME" ], dtype=None, copy=False)
for i in tqdm(range(len(tabledf))):
tempFileName = tabledf["fileName"][i][0]
tempDirName = tabledf["dir"][i][0]
tempAbsolutePath = tabledf["absolutePath"][i][0]
# 테스트용 특정 파일만 조회
# if tempFileName != "insert_dm_anl_mm_rglr_ctr_pfmc_wr_t.sql":
# continue
# print("start ", tempFileName)
# 파일내 일치 테이블 추출
f = open(tempAbsolutePath, 'r', encoding="utf-8")
lines = f.readlines()
for line in lines:
line = line.strip() # 줄 끝의 줄 바꿈 문자를 제거한다.
# 라인 내 insert 예약어가 존재하는 경우 원천 목록에서 제외
# 별도 컬럼에 테이블명 저장
if line.upper().find("INSERT") >= 0:
# 파일 라인별 일치 테이블 추출
for i in range(len(dbTabledf)):
tempDbTableId = dbTabledf["ID"][i]
# 파일내 일치 테이블이 존재하는 경우 저장: 이 로직은 테이블명이 유사할경우 에러 발생 예) anl_dd_ua, anl_dd
if line.upper().find(tempDbTableId.upper()) >= 0:
# 라인내 단어 구분자 재정의
# 파일내 일치 테이블이 존재하는 경우 저장: 이 로직은 테이블명이 유사할경우 에러 발생 처리 로직 예) anl_dd_ua, anl_dd
templine = line.replace(" ", "^").replace("(", "^").replace(")", "^")
lineList = templine.split("^")
for elem in lineList:
# 라인 내 테이블 정보가 찾으려는 테이블과 완전일치하는 경우
if tempDbTableId.upper() == elem.upper():
tempDict = {}
tempDict["AbsolutePath"] = tempAbsolutePath
tempDict["TGT_TBL_NAME"] = tempDbTableId.upper()
targetTabledf = targetTabledf.append(tempDict, ignore_index=True)
continue
# 파일 라인별 일치 테이블 추출
for i in range(len(dbTabledf)):
tempDbTableId = dbTabledf["ID"][i]
# 파일내 일치 테이블이 존재하는 경우 저장: 이 로직은 테이블명이 유사할경우 에러 발생 예) anl_dd_ua, anl_dd
if line.upper().find(tempDbTableId.upper()) >= 0:
# 라인내 단어 구분자 재정의
# 파일내 일치 테이블이 존재하는 경우 저장: 이 로직은 테이블명이 유사할경우 에러 발생 처리 로직 예) anl_dd_ua, anl_dd
line = line.replace(" ", "^").replace("(", "^").replace(")", "^")
lineList = line.split("^")
for elem in lineList:
# 라인 내 테이블 정보가 찾으려는 테이블과 완전일치하는 경우
if tempDbTableId.upper() == elem.upper():
tempDict = {}
tempDict["AbsolutePath"] = tempAbsolutePath
tempDict["DirName"] = tempDirName
tempDict["FileName"] = tempFileName
tempDict["INPUT_FORMAT"] = dbTabledf["INPUT_FORMAT"][i]
tempDict["TBL_TYPE"] = dbTabledf["TBL_TYPE"][i]
tempDict["DB_NAME"] = dbTabledf["DB_NAME"][i].upper()
tempDict["TBL_NAME"] = dbTabledf["TBL_NAME"][i].upper()
tempDict["TBL_COMMENT"] = dbTabledf["TBL_COMMENT"][i]
sourceTabledf = sourceTabledf.append(tempDict, ignore_index=True)
f.close()
# 중복제거
sourceTabledf = sourceTabledf.drop_duplicates(['FileName', 'DB_NAME', 'TBL_NAME'], keep="first")
# 정렬
sourceTabledf = sourceTabledf.sort_values(["FileName","DB_NAME","TBL_NAME"], ascending=True)
# 인덱스재설정
sourceTabledf = sourceTabledf.reset_index(drop=True)
# 일괄 대문자 변경
# sourceTabledf = sourceTabledf.applymap(str.upper)
# 타겟 테이블 정보 JOIN
sourceTabledf = pd.merge(left=sourceTabledf, right=targetTabledf, left_on='AbsolutePath', right_on='AbsolutePath', how='left')
# 1. 엑셀 파일 열기 w/ExcelWriter
writer = pd.ExcelWriter('Z:/20.Dev/01.Python/02.download/HdpSrcList.xlsx', engine='xlsxwriter')
# 2. 시트별 데이터 추가하기(1개 시트로 저장할때와 유사)
sourceTabledf.to_excel(writer, sheet_name="sourceTable")
# 3. 엑셀 파일 저장하기
writer.save()
# 4. db 접속 close
db_close(cur, conn)