Files
mapindrag/app.py

928 lines
34 KiB
Python

import os, io, random, requests, openpyxl, concurrent.futures, argparse
from flask import Flask, render_template, request, jsonify, send_file
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
YONGSAN_CENTER = (37.5326, 126.9906)
store = {
'tabs': {},
'active_tab': None,
'next_id': 0,
'filename': None,
}
upload_progress = {
'active': False,
'stage': '',
'total': 0,
'current': 0,
'message': '',
'logs': [],
}
def tab_data(tab_id):
t = store['tabs'].get(tab_id)
if not t:
return None, 404
return t, None
GEO_CACHE = {}
GEO_STATS = {'success': 0, 'failed': 0, 'failed_addrs': []}
import time, re
KAKAO_API_KEY = os.environ.get('KAKAO_API_KEY', '')
VWORLD_API_KEY = os.environ.get('VWORLD_API_KEY', 'E1FD0345-8021-3C9C-A397-CE0D88736D78')
def geocode_kakao(address, timeout=10):
if not KAKAO_API_KEY:
return None
try:
url = 'https://dapi.kakao.com/v2/local/search/address.json'
headers = {'Authorization': f'KakaoAK {KAKAO_API_KEY}'}
params = {'query': address.strip(), 'size': 1}
resp = requests.get(url, params=params, headers=headers, timeout=timeout)
if resp.ok:
data = resp.json()
if data.get('documents'):
doc = data['documents'][0]
return float(doc['y']), float(doc['x'])
except Exception:
pass
return None
def looks_like_road_address(addr):
road_patterns = ['', '', '대로', '번길']
for p in road_patterns:
if p in addr and any(c.isdigit() for c in addr[addr.find(p):]):
return True
return False
def geocode_vworld(address, timeout=15):
if not VWORLD_API_KEY:
return None
addr = address.strip()
variants = [
addr,
f'서울특별시 용산구 {addr}' if '용산구' not in addr and '서울' not in addr else None,
f'용산구 {addr}' if '용산구' not in addr else None,
]
variants = [v for v in variants if v]
if looks_like_road_address(addr):
types = ['ROAD', 'PARCEL']
else:
types = ['PARCEL', 'ROAD']
for v in variants:
for addr_type in types:
try:
url = 'https://api.vworld.kr/req/address'
params = {
'service': 'address',
'request': 'getcoord',
'version': '2.0',
'crs': 'EPSG:4326',
'address': v,
'format': 'json',
'type': addr_type,
'key': VWORLD_API_KEY,
}
logger.debug(f' [VWorld 요청] type={addr_type}, address={v}')
resp = requests.get(url, params=params, timeout=timeout)
if not resp.ok:
logger.warning(f' [VWorld HTTP 오류] 상태={resp.status_code}, 응답={resp.text[:200]}')
continue
data = resp.json()
logger.debug(f' [VWorld 응답] {data}')
response = data.get('response', {})
status = response.get('status')
if status == 'OK' or status == 'SUCCESS':
result = response.get('result')
if result:
point = result.get('point')
if point and point.get('x') and point.get('y'):
lat = float(point['y'])
lng = float(point['x'])
logger.info(f' [VWorld 성공] {addr} (type={addr_type}) -> ({lat}, {lng})')
return lat, lng
else:
err_msg = response.get('error', {}).get('message', str(data))
logger.debug(f' [VWorld {addr_type} 실패] {v}: status={status}, 오류={err_msg}')
except Exception as e:
logger.warning(f' [VWorld 예외] {v}: {e}')
import traceback
traceback.print_exc()
return None
def normalize_korean_address(addr):
patterns = [
(r'서울시', '서울특별시'),
(r'경기도', '경기'),
(r'(\d+)동', r'\1동'),
]
for old, new in patterns:
addr = re.sub(old, new, addr)
return addr
def geocode_nominatim(address, timeout=10):
addr = normalize_korean_address(address.strip())
variants = [
addr,
f'{addr}, 서울특별시, 대한민국',
f'{addr}, 대한민국',
]
for v in variants:
try:
url = 'https://nominatim.openstreetmap.org/search'
params = {
'q': v,
'format': 'json',
'limit': 3,
'countrycodes': 'kr',
'accept-language': 'ko,en',
}
headers = {'User-Agent': 'MapinDrag/1.0 (non-commercial research)'}
resp = requests.get(url, params=params, headers=headers, timeout=timeout)
if resp.ok:
results = resp.json()
if results:
for r in results:
lat = float(r['lat'])
lng = float(r['lon'])
if 37 < lat < 38 and 126.9 < lng < 127.1:
return lat, lng
except Exception:
pass
return None
def geocode_with_fallback(address):
address = str(address or '').strip()
if not address:
jitter = random.uniform(-0.005, 0.005)
return (YONGSAN_CENTER[0] + jitter, YONGSAN_CENTER[1] + jitter)
if address in GEO_CACHE:
return GEO_CACHE[address]
addr_clean = address
prefixes = ['서울특별시 ', '서울시 ', '서울 ', '용산구 ']
for p in prefixes:
if addr_clean.startswith(p):
addr_clean = addr_clean[len(p):]
break
variants = []
if '용산구' not in address and '서울' not in address:
variants.append(f'서울특별시 용산구 ' + addr_clean)
variants.append(f'용산구 ' + addr_clean)
variants.append(address)
if '서울' in address and '용산구' not in address:
new_addr = address.replace('서울특별시', '서울특별시 용산구').replace('서울시', '서울시 용산구')
variants.append(new_addr)
unique_vars = []
seen = set()
for v in variants:
if v not in seen:
seen.add(v)
unique_vars.append(v)
result = None
used_api = None
api_order = []
if KAKAO_API_KEY:
api_order.append(('카카오', geocode_kakao))
if VWORLD_API_KEY:
api_order.append(('VWorld', geocode_vworld))
api_order.append(('Nominatim', geocode_nominatim))
for api_name, geo_func in api_order:
if result:
break
for v in unique_vars:
try:
result = geo_func(v)
if result:
used_api = api_name
break
except Exception as e:
logger.debug(f'{api_name} 시도 실패 ({v}): {e}')
continue
if result:
GEO_STATS['success'] += 1
logger.info(f' [지오코딩 성공 ({used_api})] {address} -> {result}')
GEO_CACHE[address] = result
return result
GEO_STATS['failed'] += 1
if len(GEO_STATS['failed_addrs']) < 20:
GEO_STATS['failed_addrs'].append(address)
logger.warning(f' [지오코딩 실패 - 폴백 사용] {address}')
jitter = random.uniform(-0.005, 0.005)
result = (YONGSAN_CENTER[0] + jitter, YONGSAN_CENTER[1] + jitter)
GEO_CACHE[address] = result
return result
def empty_tab(name):
return {'name': name, 'workers': [], 'workplaces': [], 'assignments': {}}
# ── Tab management ──
@app.route('/api/tabs')
def list_tabs():
tabs = [{'id': tid, 'name': t['name'],
'workerCount': len(t['workers']),
'assignCount': len(t['assignments'])}
for tid, t in store['tabs'].items()]
return jsonify({'tabs': tabs, 'activeTab': store['active_tab']})
@app.route('/api/upload-progress')
def get_upload_progress():
return jsonify({
'active': upload_progress['active'],
'stage': upload_progress['stage'],
'total': upload_progress['total'],
'current': upload_progress['current'],
'message': upload_progress['message'],
'logs': upload_progress['logs'][-15:],
})
def update_progress(stage, message='', total=0, current=0):
upload_progress['stage'] = stage
upload_progress['message'] = message
upload_progress['total'] = total
upload_progress['current'] = current
if message:
upload_progress['logs'].append(message)
logger.info(f'[진행: {stage}] {message}')
@app.route('/api/tabs/add', methods=['POST'])
def add_tab():
body = request.get_json() or {}
name = (body.get('name') or '').strip() or f'{store["next_id"] + 1}'
tid = f'tab_{store["next_id"]}'
store['next_id'] += 1
store['tabs'][tid] = empty_tab(name)
store['active_tab'] = tid
return jsonify({'id': tid, 'name': name, 'activeTab': tid})
@app.route('/api/tabs/rename', methods=['POST'])
def rename_tab():
body = request.get_json() or {}
tid = body.get('id')
name = (body.get('name') or '').strip()
if not tid or not name or tid not in store['tabs']:
return jsonify({'error': '잘못된 요청'}), 400
store['tabs'][tid]['name'] = name
return jsonify({'id': tid, 'name': name})
@app.route('/api/tabs/delete', methods=['POST'])
def delete_tab():
body = request.get_json() or {}
tid = body.get('id')
if not tid or tid not in store['tabs']:
return jsonify({'error': '잘못된 요청'}), 400
del store['tabs'][tid]
if store['active_tab'] == tid:
keys = list(store['tabs'].keys())
store['active_tab'] = keys[0] if keys else None
return jsonify({'deleted': tid})
@app.route('/api/tabs/activate', methods=['POST'])
def activate_tab():
body = request.get_json() or {}
tid = body.get('id')
if not tid or tid not in store['tabs']:
return jsonify({'error': '탭을 찾을 수 없음'}), 400
store['active_tab'] = tid
return jsonify({'activeTab': tid})
# ── Data ──
def get_active():
t = store['tabs'].get(store['active_tab'])
if not t:
return None, jsonify({'error': '탭 없음'}), 400
return t, None, None
@app.route('/api/vworld-api-status')
def vworld_api_status():
"""VWorld API Key의 유효성을 확인합니다."""
if not VWORLD_API_KEY:
return jsonify({'valid': False, 'key': None})
# 간단한 주소로 VWorld API 테스트
try:
test_address = '서울특별시 용산구'
url = 'https://api.vworld.kr/req/address'
params = {
'service': 'address',
'request': 'getcoord',
'version': '2.0',
'crs': 'EPSG:4326',
'address': test_address,
'format': 'json',
'type': 'PARCEL',
'key': VWORLD_API_KEY,
}
resp = requests.get(url, params=params, timeout=5)
if resp.ok:
data = resp.json()
response = data.get('response', {})
status = response.get('status')
# OK, SUCCESS, FAIL 등의 상태가 반환되면 유효한 API Key
if status in ['OK', 'SUCCESS', 'FAIL']:
return jsonify({'valid': True, 'key': VWORLD_API_KEY})
except Exception as e:
logger.debug(f'VWorld API 테스트 실패: {e}')
return jsonify({'valid': False, 'key': None})
@app.route('/api/data')
def get_data():
tid = request.args.get('tab') or store['active_tab']
t = store['tabs'].get(tid)
if not t:
return jsonify({'workers': [], 'workplaces': [], 'assignments': {}})
return jsonify({
'workers': t['workers'],
'workplaces': t['workplaces'],
'assignments': t['assignments'],
})
def find_header_idx(headers, keywords):
for kw in keywords:
for i, h in enumerate(headers):
if kw in h:
return i
return None
def is_likely_worker_sheet(sheet_name, headers):
sheet_lower = sheet_name.lower()
if any(kw in sheet_lower for kw in ['근무자', '참관인', '인력', '선거인', 'worker']):
return True
if any(kw in sheet_lower for kw in ['근무처', '투표소', '장소', '기관', 'workplace', 'poll']):
return False
hope_idx = find_header_idx(headers, ['희망사항', '희망', '배치희망'])
dob_idx = find_header_idx(headers, ['생년월일', '생일', '주민번호'])
phone_idx = find_header_idx(headers, ['연락처', '전화', '휴대폰', '핸드폰'])
worker_indicators = sum(1 for x in [hope_idx, dob_idx, phone_idx] if x is not None)
return worker_indicators >= 2
def download_from_google_drive(file_id, timeout=30):
"""구글 드라이브에서 파일을 다운로드합니다."""
url = f'https://drive.google.com/uc?id={file_id}&export=download'
try:
logger.info(f'구글 드라이브에서 파일 다운로드 중: {file_id}')
resp = requests.get(url, timeout=timeout, allow_redirects=True, stream=True)
if not resp.ok:
logger.error(f'구글 드라이브 다운로드 실패: 상태코드={resp.status_code}')
return None
# 파일 크기 확인
file_size = int(resp.headers.get('content-length', 0))
if file_size == 0:
logger.warning('다운로드한 파일의 크기가 0입니다')
return None
if file_size > 16 * 1024 * 1024: # 16MB 이상
logger.error(f'파일이 너무 큽니다: {file_size} bytes')
return None
# 파일을 바이너리로 읽기
file_content = io.BytesIO(resp.content)
logger.info(f'파일 다운로드 완료: {file_size} bytes')
return file_content
except requests.exceptions.Timeout:
logger.error(f'구글 드라이브 다운로드 타임아웃: {file_id}')
return None
except Exception as e:
logger.error(f'구글 드라이브 다운로드 중 오류: {e}')
return None
def process_excel_file(wb, filename):
"""엑셀 파일을 처리하고 근무자/근무처 데이터를 반환합니다."""
workers, workplaces, all_addrs = [], [], set()
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
headers = [str(c.value or '').strip() for c in ws[1]]
logger.info(f' 시트: {sheet_name}, 헤더: {headers}')
update_progress('엑셀 파싱', f'시트 처리 중: {sheet_name}')
hope_idx = find_header_idx(headers, ['희망사항', '희망', '배치희망'])
is_worker_sheet = is_likely_worker_sheet(sheet_name, headers)
name_idx = find_header_idx(headers, ['이름', '성명', '명칭', '근무지명', '투표소명'])
addr_idx = find_header_idx(headers, ['주소', '위치', '소재지'])
logger.info(f' → 희망사항: {hope_idx}, 이름: {name_idx}, 주소: {addr_idx}, 근무자시트: {is_worker_sheet}')
if is_worker_sheet:
hope_idx = hope_idx if hope_idx is not None else 1
name_idx = name_idx if name_idx is not None else 0
dob_idx = find_header_idx(headers, ['생년월일', '생일']) or 3
phone_idx = find_header_idx(headers, ['연락처', '전화', '휴대폰']) or 4
if addr_idx is None:
addr_idx = 2
logger.warning(f' [주의] 주소 컬럼을 찾을 수 없어 기본 인덱스 {addr_idx} 사용')
row_count = 0
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[addr_idx] or '').strip()
if addr:
all_addrs.add(addr)
row_count += 1
logger.info(f' → 근무자 행 수: {row_count}')
else:
name_idx = name_idx if name_idx is not None else 0
if addr_idx is None:
addr_idx = 1
logger.warning(f' [주의] 주소 컬럼을 찾을 수 없어 기본 인덱스 {addr_idx} 사용')
row_count = 0
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[addr_idx] or '').strip()
if addr:
all_addrs.add(addr)
row_count += 1
logger.info(f' → 근무처 행 수: {row_count}')
# 지오코딩
total_addrs = len(all_addrs)
logger.info(f'수집된 고유 주소 수: {total_addrs}')
GEO_STATS['success'] = 0
GEO_STATS['failed'] = 0
GEO_STATS['failed_addrs'] = []
if VWORLD_API_KEY:
api_name = 'VWorld API'
delay = 0.1
elif KAKAO_API_KEY:
api_name = '카카오 API'
delay = 0.1
else:
api_name = 'Nominatim (API 키 권장)'
logger.info(' (참고: VWorld API 키 발급받으시면 정확도와 속도가 크게 향상됩니다)')
delay = 1.0
logger.info(f'=== 지오코딩 시작 ({api_name}) ===')
update_progress('지오코딩', f'주소를 좌표로 변환 중 ({api_name})', total=total_addrs, current=0)
geo_results = {}
for idx, addr in enumerate(all_addrs, 1):
msg = f'[{idx}/{total_addrs}] {addr}'
logger.info(f' {msg}')
geo_results[addr] = geocode_with_fallback(addr)
if idx % 5 == 0 or idx == total_addrs:
update_progress('지오코딩', f'좌표 변환 중... {idx}/{total_addrs}', total=total_addrs, current=idx)
if idx < total_addrs:
time.sleep(delay)
logger.info(f'=== 지오코딩 완료: 성공 {GEO_STATS["success"]}, 실패 {GEO_STATS["failed"]} ===')
if GEO_STATS['failed_addrs']:
logger.warning(f'실패한 주소 목록 (최대 20개): {GEO_STATS["failed_addrs"]}')
# 근무자/근무처 데이터 구성
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
headers = [str(c.value or '').strip() for c in ws[1]]
hope_idx = find_header_idx(headers, ['희망사항', '희망', '배치희망'])
is_worker_sheet = is_likely_worker_sheet(sheet_name, headers)
name_idx = find_header_idx(headers, ['이름', '성명', '명칭', '근무지명', '투표소명'])
addr_idx = find_header_idx(headers, ['주소', '위치', '소재지'])
if is_worker_sheet:
hope_idx = hope_idx if hope_idx is not None else 1
name_idx = name_idx if name_idx is not None else 0
dob_idx = find_header_idx(headers, ['생년월일', '생일']) or 3
phone_idx = find_header_idx(headers, ['연락처', '전화', '휴대폰']) or 4
if addr_idx is None:
addr_idx = 2
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[addr_idx] or '').strip()
lat, lng = geo_results.get(addr, (YONGSAN_CENTER[0] + random.uniform(-0.005, 0.005),
YONGSAN_CENTER[1] + random.uniform(-0.005, 0.005)))
workers.append({
'id': f'w{len(workers)}', 'name': name,
'hope': str(row[hope_idx] or '').strip(),
'address': addr,
'dob': str(row[dob_idx] or '').strip(),
'phone': str(row[phone_idx] or '').strip(),
'lat': lat, 'lng': lng,
})
else:
name_idx = name_idx if name_idx is not None else 0
if addr_idx is None:
addr_idx = 1
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[addr_idx] or '').strip()
lat, lng = geo_results.get(addr, (YONGSAN_CENTER[0] + random.uniform(-0.005, 0.005),
YONGSAN_CENTER[1] + random.uniform(-0.005, 0.005)))
workplaces.append({
'id': f'p{len(workplaces)}', 'name': name,
'address': addr, 'lat': lat, 'lng': lng,
})
logger.info(f'=== 처리 완료: 근무자 {len(workers)}명, 근무처 {len(workplaces)}개 ===')
return workers, workplaces
@app.route('/api/upload', methods=['POST'])
def upload():
tid = request.form.get('tab') or store['active_tab']
t = store['tabs'].get(tid)
if not t:
return jsonify({'error': '탭 없음'}), 400
file = request.files.get('file')
if not file:
return jsonify({'error': '파일이 없습니다.'}), 400
upload_progress.update({
'active': True,
'stage': '시작',
'total': 0,
'current': 0,
'message': '',
'logs': [],
})
update_progress('파일 로딩', f'파일을 읽는 중: {file.filename}')
path = os.path.join(app.config['UPLOAD_FOLDER'], file.filename)
file.save(path)
wb = openpyxl.load_workbook(path, data_only=True)
logger.info(f'=== 엑셀 업로드 시작: {file.filename} ===')
logger.info(f'시트 목록: {wb.sheetnames}')
workers, workplaces, all_addrs = [], [], set()
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
headers = [str(c.value or '').strip() for c in ws[1]]
logger.info(f' 시트: {sheet_name}, 헤더: {headers}')
update_progress('엑셀 파싱', f'시트 처리 중: {sheet_name}')
hope_idx = find_header_idx(headers, ['희망사항', '희망', '배치희망'])
is_worker_sheet = is_likely_worker_sheet(sheet_name, headers)
name_idx = find_header_idx(headers, ['이름', '성명', '명칭', '근무지명', '투표소명'])
addr_idx = find_header_idx(headers, ['주소', '위치', '소재지'])
logger.info(f' → 희망사항: {hope_idx}, 이름: {name_idx}, 주소: {addr_idx}, 근무자시트: {is_worker_sheet}')
if is_worker_sheet:
hope_idx = hope_idx if hope_idx is not None else 1
name_idx = name_idx if name_idx is not None else 0
dob_idx = find_header_idx(headers, ['생년월일', '생일']) or 3
phone_idx = find_header_idx(headers, ['연락처', '전화', '휴대폰']) or 4
if addr_idx is None:
addr_idx = 2
logger.warning(f' [주의] 주소 컬럼을 찾을 수 없어 기본 인덱스 {addr_idx} 사용')
row_count = 0
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[addr_idx] or '').strip()
if addr:
all_addrs.add(addr)
row_count += 1
logger.info(f' → 근무자 행 수: {row_count}')
else:
name_idx = name_idx if name_idx is not None else 0
if addr_idx is None:
addr_idx = 1
logger.warning(f' [주의] 주소 컬럼을 찾을 수 없어 기본 인덱스 {addr_idx} 사용')
row_count = 0
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[addr_idx] or '').strip()
if addr:
all_addrs.add(addr)
row_count += 1
logger.info(f' → 근무처 행 수: {row_count}')
total_addrs = len(all_addrs)
logger.info(f'수집된 고유 주소 수: {total_addrs}')
GEO_STATS['success'] = 0
GEO_STATS['failed'] = 0
GEO_STATS['failed_addrs'] = []
if VWORLD_API_KEY:
api_name = 'VWorld API'
delay = 0.1
elif KAKAO_API_KEY:
api_name = '카카오 API'
delay = 0.1
else:
api_name = 'Nominatim (API 키 권장)'
logger.info(' (참고: VWorld API 키 발급받으시면 정확도와 속도가 크게 향상됩니다)')
delay = 1.0
logger.info(f'=== 지오코딩 시작 ({api_name}) ===')
update_progress('지오코딩', f'주소를 좌표로 변환 중 ({api_name})', total=total_addrs, current=0)
geo_results = {}
for idx, addr in enumerate(all_addrs, 1):
msg = f'[{idx}/{total_addrs}] {addr}'
logger.info(f' {msg}')
geo_results[addr] = geocode_with_fallback(addr)
if idx % 5 == 0 or idx == total_addrs:
update_progress('지오코딩', f'좌표 변환 중... {idx}/{total_addrs}', total=total_addrs, current=idx)
if idx < total_addrs:
time.sleep(delay)
logger.info(f'=== 지오코딩 완료: 성공 {GEO_STATS["success"]}, 실패 {GEO_STATS["failed"]} ===')
if GEO_STATS['failed_addrs']:
logger.warning(f'실패한 주소 목록 (최대 20개): {GEO_STATS["failed_addrs"]}')
update_progress('완료', f'처리 완료: 근무자 {len(workers or [])}명, 근무처 {len(workplaces or [])}')
upload_progress['active'] = False
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
headers = [str(c.value or '').strip() for c in ws[1]]
hope_idx = find_header_idx(headers, ['희망사항', '희망', '배치희망'])
is_worker_sheet = is_likely_worker_sheet(sheet_name, headers)
name_idx = find_header_idx(headers, ['이름', '성명', '명칭', '근무지명', '투표소명'])
addr_idx = find_header_idx(headers, ['주소', '위치', '소재지'])
if is_worker_sheet:
hope_idx = hope_idx if hope_idx is not None else 1
name_idx = name_idx if name_idx is not None else 0
dob_idx = find_header_idx(headers, ['생년월일', '생일']) or 3
phone_idx = find_header_idx(headers, ['연락처', '전화', '휴대폰']) or 4
if addr_idx is None:
addr_idx = 2
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[addr_idx] or '').strip()
lat, lng = geo_results.get(addr, (YONGSAN_CENTER[0] + random.uniform(-0.005, 0.005),
YONGSAN_CENTER[1] + random.uniform(-0.005, 0.005)))
workers.append({
'id': f'w{len(workers)}', 'name': name,
'hope': str(row[hope_idx] or '').strip(),
'address': addr,
'dob': str(row[dob_idx] or '').strip(),
'phone': str(row[phone_idx] or '').strip(),
'lat': lat, 'lng': lng,
})
else:
name_idx = name_idx if name_idx is not None else 0
if addr_idx is None:
addr_idx = 1
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[addr_idx] or '').strip()
lat, lng = geo_results.get(addr, (YONGSAN_CENTER[0] + random.uniform(-0.005, 0.005),
YONGSAN_CENTER[1] + random.uniform(-0.005, 0.005)))
workplaces.append({
'id': f'p{len(workplaces)}', 'name': name,
'address': addr, 'lat': lat, 'lng': lng,
})
logger.info(f'=== 업로드 완료: 근무자 {len(workers)}명, 근무처 {len(workplaces)}개 ===')
t['workers'] = workers
t['workplaces'] = workplaces
t['assignments'] = {}
store['filename'] = file.filename
update_progress('완료', f'처리 완료: 근무자 {len(workers)}명, 근무처 {len(workplaces)}')
upload_progress['active'] = False
return jsonify({'workers': workers, 'workplaces': workplaces, 'assignments': {}})
@app.route('/api/google-drive-upload', methods=['POST'])
def google_drive_upload():
"""구글 드라이브에서 엑셀 파일을 다운로드하여 처리합니다."""
body = request.get_json() or {}
file_id = body.get('fileId', '').strip()
tid = body.get('tab') or store['active_tab']
if not file_id:
return jsonify({'error': '파일 ID가 없습니다.'}), 400
t = store['tabs'].get(tid)
if not t:
return jsonify({'error': '탭 없음'}), 400
upload_progress.update({
'active': True,
'stage': '시작',
'total': 0,
'current': 0,
'message': '',
'logs': [],
})
update_progress('파일 로딩', f'구글 드라이브에서 파일을 다운로드 중입니다...')
# 구글 드라이브에서 파일 다운로드
file_content = download_from_google_drive(file_id)
if not file_content:
upload_progress['active'] = False
return jsonify({'error': '파일을 다운로드할 수 없습니다. 파일 ID를 확인하고 파일이 공유되어 있는지 확인하세요.'}), 400
try:
# 파일을 엑셀로 읽기
update_progress('파일 파싱', '엑셀 파일을 읽는 중입니다...')
wb = openpyxl.load_workbook(file_content, data_only=True)
logger.info(f'=== 구글 드라이브 엑셀 업로드 시작: {file_id} ===')
logger.info(f'시트 목록: {wb.sheetnames}')
# 엑셀 파일 처리
workers, workplaces = process_excel_file(wb, f'GoogleDrive_{file_id}.xlsx')
# 탭에 데이터 저장
t['workers'] = workers
t['workplaces'] = workplaces
t['assignments'] = {}
store['filename'] = f'GoogleDrive_{file_id}'
update_progress('완료', f'처리 완료: 근무자 {len(workers)}명, 근무처 {len(workplaces)}')
upload_progress['active'] = False
logger.info(f'=== 구글 드라이브 업로드 완료: 근무자 {len(workers)}명, 근무처 {len(workplaces)}개 ===')
return jsonify({'workers': workers, 'workplaces': workplaces, 'assignments': {}})
except Exception as e:
logger.error(f'엑셀 파일 처리 중 오류: {e}')
import traceback
traceback.print_exc()
upload_progress['active'] = False
return jsonify({'error': f'파일 처리 중 오류: {str(e)}'}), 400
@app.route('/api/assign', methods=['POST'])
def assign():
body = request.get_json() or {}
tid = body.get('tab') or store['active_tab']
t = store['tabs'].get(tid)
if not t:
return jsonify({'error': '탭 없음'}), 400
worker_id, workplace_id = body.get('workerId'), body.get('workplaceId')
if not worker_id or not workplace_id:
return jsonify({'error': '잘못된 요청'}), 400
t['assignments'][worker_id] = workplace_id
return jsonify({'assignments': t['assignments']})
@app.route('/api/unassign', methods=['POST'])
def unassign():
body = request.get_json() or {}
tid = body.get('tab') or store['active_tab']
t = store['tabs'].get(tid)
if not t:
return jsonify({'error': '탭 없음'}), 400
worker_id = body.get('workerId')
t['assignments'].pop(worker_id, None)
return jsonify({'assignments': t['assignments']})
@app.route('/api/reset', methods=['POST'])
def reset():
body = request.get_json() or {}
tid = body.get('tab') or store['active_tab']
t = store['tabs'].get(tid)
if t:
t['assignments'] = {}
return jsonify({'assignments': t['assignments'] if t else {}})
@app.route('/api/export')
def export():
tid = request.args.get('tab') or store['active_tab']
t = store['tabs'].get(tid)
if not t:
return jsonify({'error': '탭 없음'}), 400
wb = openpyxl.Workbook()
ws1 = wb.active
ws1.title = '배치 현황'
ws1.append(['이름', '연락처', '생년월일', '희망사항', '주소', '배치근무지', '근무지주소'])
wp_lookup = {wp['id']: wp for wp in t['workplaces']}
for w in t['workers']:
a_id = t['assignments'].get(w['id'])
wn = wp_lookup[a_id]['name'] if a_id and a_id in wp_lookup else ''
wa = wp_lookup[a_id]['address'] if a_id and a_id in wp_lookup else ''
ws1.append([w['name'], w['phone'], w['dob'], w['hope'], w['address'], wn, wa])
ws2 = wb.create_sheet('근무지별 배치')
ws2.append(['근무지명', '주소', '배치인원', '배치된근무자'])
for wp in t['workplaces']:
aw = [w['name'] for w in t['workers'] if t['assignments'].get(w['id']) == wp['id']]
ws2.append([wp['name'], wp['address'], len(aw), ', '.join(aw)])
buf = io.BytesIO()
wb.save(buf)
buf.seek(0)
tab_name = t['name']
fname = store.get('filename', 'export.xlsx')
base, ext = os.path.splitext(fname)
dn = f'{base}_{tab_name}_배치결과{ext}'
return send_file(buf, as_attachment=True, download_name=dn,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
@app.route('/')
def index():
return render_template('index.html')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int, default=5000, help='Port to run the server on')
args = parser.parse_args()
app.run(debug=True, port=args.port)