Feat: 근무처 목록 및 배지 표시, 엑셀 업로드 진행 팝업 추가

- 근무처 마커에 할당 인원 배지 표시
- 근무처 목록 패널 추가 (탭 전환 가능)
- 탭 생성 후 F5 없이 즉시 표시되도록 버그 수정
- VWorld/카카오 지오코딩 API 지원 추가 (type 파라미터 포함)
- 엑셀 업로드 중 진행 상황 팝업 모달 추가
- 근무처 목록 근무자 표시 3명 → 4명으로 변경
- 지오코딩 진행 상황 폴링 API 추가
This commit is contained in:
user01
2026-05-13 12:45:22 +09:00
parent a5db41187c
commit 43f23c52fc
3 changed files with 680 additions and 59 deletions

443
app.py
View File

@@ -1,5 +1,9 @@
import os, io, random, requests, openpyxl, concurrent.futures
from flask import Flask, render_template, request, jsonify, send_file
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
@@ -15,6 +19,15 @@ store = {
'filename': None,
}
upload_progress = {
'active': False,
'stage': '',
'total': 0,
'current': 0,
'message': '',
'logs': [],
}
def tab_data(tab_id):
t = store['tabs'].get(tab_id)
@@ -24,23 +37,217 @@ def tab_data(tab_id):
GEO_CACHE = {}
GEO_STATS = {'success': 0, 'failed': 0, 'failed_addrs': []}
def geocode_with_fallback(address):
if address in GEO_CACHE:
return GEO_CACHE[address]
import time, re
KAKAO_API_KEY = os.environ.get('KAKAO_API_KEY', '')
VWORLD_API_KEY = os.environ.get('VWORLD_API_KEY', 'E1FD0345-8021-3C9C-A397-CE0D88736D78')
def geocode_kakao(address, timeout=10):
if not KAKAO_API_KEY:
return None
try:
url = 'https://nominatim.openstreetmap.org/search'
params = {'q': address.strip(), 'format': 'json', 'limit': 1, 'countrycodes': 'kr'}
headers = {'User-Agent': 'MapinDrag/1.0'}
resp = requests.get(url, params=params, headers=headers, timeout=5)
url = 'https://dapi.kakao.com/v2/local/search/address.json'
headers = {'Authorization': f'KakaoAK {KAKAO_API_KEY}'}
params = {'query': address.strip(), 'size': 1}
resp = requests.get(url, params=params, headers=headers, timeout=timeout)
if resp.ok:
data = resp.json()
if data:
lat, lng = float(data[0]['lat']), float(data[0]['lon'])
GEO_CACHE[address] = (lat, lng)
return lat, lng
if data.get('documents'):
doc = data['documents'][0]
return float(doc['y']), float(doc['x'])
except Exception:
pass
return None
def looks_like_road_address(addr):
road_patterns = ['', '', '대로', '번길']
for p in road_patterns:
if p in addr and any(c.isdigit() for c in addr[addr.find(p):]):
return True
return False
def geocode_vworld(address, timeout=15):
if not VWORLD_API_KEY:
return None
addr = address.strip()
variants = [
addr,
f'서울특별시 용산구 {addr}' if '용산구' not in addr and '서울' not in addr else None,
f'용산구 {addr}' if '용산구' not in addr else None,
]
variants = [v for v in variants if v]
if looks_like_road_address(addr):
types = ['ROAD', 'PARCEL']
else:
types = ['PARCEL', 'ROAD']
for v in variants:
for addr_type in types:
try:
url = 'https://api.vworld.kr/req/address'
params = {
'service': 'address',
'request': 'getcoord',
'version': '2.0',
'crs': 'EPSG:4326',
'address': v,
'format': 'json',
'type': addr_type,
'key': VWORLD_API_KEY,
}
logger.debug(f' [VWorld 요청] type={addr_type}, address={v}')
resp = requests.get(url, params=params, timeout=timeout)
if not resp.ok:
logger.warning(f' [VWorld HTTP 오류] 상태={resp.status_code}, 응답={resp.text[:200]}')
continue
data = resp.json()
logger.debug(f' [VWorld 응답] {data}')
response = data.get('response', {})
status = response.get('status')
if status == 'OK' or status == 'SUCCESS':
result = response.get('result')
if result:
point = result.get('point')
if point and point.get('x') and point.get('y'):
lat = float(point['y'])
lng = float(point['x'])
logger.info(f' [VWorld 성공] {addr} (type={addr_type}) -> ({lat}, {lng})')
return lat, lng
else:
err_msg = response.get('error', {}).get('message', str(data))
logger.debug(f' [VWorld {addr_type} 실패] {v}: status={status}, 오류={err_msg}')
except Exception as e:
logger.warning(f' [VWorld 예외] {v}: {e}')
import traceback
traceback.print_exc()
return None
def normalize_korean_address(addr):
patterns = [
(r'서울시', '서울특별시'),
(r'경기도', '경기'),
(r'(\d+)동', r'\1동'),
]
for old, new in patterns:
addr = re.sub(old, new, addr)
return addr
def geocode_nominatim(address, timeout=10):
addr = normalize_korean_address(address.strip())
variants = [
addr,
f'{addr}, 서울특별시, 대한민국',
f'{addr}, 대한민국',
]
for v in variants:
try:
url = 'https://nominatim.openstreetmap.org/search'
params = {
'q': v,
'format': 'json',
'limit': 3,
'countrycodes': 'kr',
'accept-language': 'ko,en',
}
headers = {'User-Agent': 'MapinDrag/1.0 (non-commercial research)'}
resp = requests.get(url, params=params, headers=headers, timeout=timeout)
if resp.ok:
results = resp.json()
if results:
for r in results:
lat = float(r['lat'])
lng = float(r['lon'])
if 37 < lat < 38 and 126.9 < lng < 127.1:
return lat, lng
except Exception:
pass
return None
def geocode_with_fallback(address):
address = str(address or '').strip()
if not address:
jitter = random.uniform(-0.005, 0.005)
return (YONGSAN_CENTER[0] + jitter, YONGSAN_CENTER[1] + jitter)
if address in GEO_CACHE:
return GEO_CACHE[address]
addr_clean = address
prefixes = ['서울특별시 ', '서울시 ', '서울 ', '용산구 ']
for p in prefixes:
if addr_clean.startswith(p):
addr_clean = addr_clean[len(p):]
break
variants = []
if '용산구' not in address and '서울' not in address:
variants.append(f'서울특별시 용산구 ' + addr_clean)
variants.append(f'용산구 ' + addr_clean)
variants.append(address)
if '서울' in address and '용산구' not in address:
new_addr = address.replace('서울특별시', '서울특별시 용산구').replace('서울시', '서울시 용산구')
variants.append(new_addr)
unique_vars = []
seen = set()
for v in variants:
if v not in seen:
seen.add(v)
unique_vars.append(v)
result = None
used_api = None
api_order = []
if KAKAO_API_KEY:
api_order.append(('카카오', geocode_kakao))
if VWORLD_API_KEY:
api_order.append(('VWorld', geocode_vworld))
api_order.append(('Nominatim', geocode_nominatim))
for api_name, geo_func in api_order:
if result:
break
for v in unique_vars:
try:
result = geo_func(v)
if result:
used_api = api_name
break
except Exception as e:
logger.debug(f'{api_name} 시도 실패 ({v}): {e}')
continue
if result:
GEO_STATS['success'] += 1
logger.info(f' [지오코딩 성공 ({used_api})] {address} -> {result}')
GEO_CACHE[address] = result
return result
GEO_STATS['failed'] += 1
if len(GEO_STATS['failed_addrs']) < 20:
GEO_STATS['failed_addrs'].append(address)
logger.warning(f' [지오코딩 실패 - 폴백 사용] {address}')
jitter = random.uniform(-0.005, 0.005)
result = (YONGSAN_CENTER[0] + jitter, YONGSAN_CENTER[1] + jitter)
GEO_CACHE[address] = result
@@ -62,6 +269,28 @@ def list_tabs():
return jsonify({'tabs': tabs, 'activeTab': store['active_tab']})
@app.route('/api/upload-progress')
def get_upload_progress():
return jsonify({
'active': upload_progress['active'],
'stage': upload_progress['stage'],
'total': upload_progress['total'],
'current': upload_progress['current'],
'message': upload_progress['message'],
'logs': upload_progress['logs'][-15:],
})
def update_progress(stage, message='', total=0, current=0):
upload_progress['stage'] = stage
upload_progress['message'] = message
upload_progress['total'] = total
upload_progress['current'] = current
if message:
upload_progress['logs'].append(message)
logger.info(f'[진행: {stage}] {message}')
@app.route('/api/tabs/add', methods=['POST'])
def add_tab():
body = request.get_json() or {}
@@ -129,6 +358,29 @@ def get_data():
})
def find_header_idx(headers, keywords):
for kw in keywords:
for i, h in enumerate(headers):
if kw in h:
return i
return None
def is_likely_worker_sheet(sheet_name, headers):
sheet_lower = sheet_name.lower()
if any(kw in sheet_lower for kw in ['근무자', '참관인', '인력', '선거인', 'worker']):
return True
if any(kw in sheet_lower for kw in ['근무처', '투표소', '장소', '기관', 'workplace', 'poll']):
return False
hope_idx = find_header_idx(headers, ['희망사항', '희망', '배치희망'])
dob_idx = find_header_idx(headers, ['생년월일', '생일', '주민번호'])
phone_idx = find_header_idx(headers, ['연락처', '전화', '휴대폰', '핸드폰'])
worker_indicators = sum(1 for x in [hope_idx, dob_idx, phone_idx] if x is not None)
return worker_indicators >= 2
@app.route('/api/upload', methods=['POST'])
def upload():
tid = request.form.get('tab') or store['active_tab']
@@ -140,70 +392,175 @@ def upload():
if not file:
return jsonify({'error': '파일이 없습니다.'}), 400
upload_progress.update({
'active': True,
'stage': '시작',
'total': 0,
'current': 0,
'message': '',
'logs': [],
})
update_progress('파일 로딩', f'파일을 읽는 중: {file.filename}')
path = os.path.join(app.config['UPLOAD_FOLDER'], file.filename)
file.save(path)
wb = openpyxl.load_workbook(path, data_only=True)
logger.info(f'=== 엑셀 업로드 시작: {file.filename} ===')
logger.info(f'시트 목록: {wb.sheetnames}')
workers, workplaces, all_addrs = [], [], set()
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
headers = [str(c.value or '').strip() for c in ws[1]]
if '희망사항' in headers:
idx = {h: i for i, h in enumerate(headers)}
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[idx.get('이름', 0)] or '').strip()
if name: all_addrs.add(str(row[idx.get('주소', 2)] or '').strip())
if '희망사항' not in headers:
idx = {h: i for i, h in enumerate(headers)}
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[idx.get('이름', 0)] or '').strip()
if name: all_addrs.add(str(row[idx.get('주소', 1)] or '').strip())
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as exc:
geo_results = {a: exc.submit(geocode_with_fallback, a) for a in all_addrs}
geo_results = {a: f.result() for a, f in geo_results.items()}
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
headers = [str(c.value or '').strip() for c in ws[1]]
logger.info(f' 시트: {sheet_name}, 헤더: {headers}')
update_progress('엑셀 파싱', f'시트 처리 중: {sheet_name}')
if '희망사항' in headers:
idx = {h: i for i, h in enumerate(headers)}
hope_idx = find_header_idx(headers, ['희망사항', '희망', '배치희망'])
is_worker_sheet = hope_idx is not None
name_idx = find_header_idx(headers, ['이름', '성명', '명칭', '근무지명', '투표소명'])
addr_idx = find_header_idx(headers, ['주소', '위치', '소재지'])
logger.info(f' → 희망사항: {hope_idx}, 이름: {name_idx}, 주소: {addr_idx}, 근무자시트: {is_worker_sheet}')
if is_worker_sheet:
hope_idx = hope_idx if hope_idx is not None else 1
name_idx = name_idx if name_idx is not None else 0
dob_idx = find_header_idx(headers, ['생년월일', '생일']) or 3
phone_idx = find_header_idx(headers, ['연락처', '전화', '휴대폰']) or 4
if addr_idx is None:
addr_idx = 2
logger.warning(f' [주의] 주소 컬럼을 찾을 수 없어 기본 인덱스 {addr_idx} 사용')
row_count = 0
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[idx.get('이름', 0)] or '').strip()
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[idx.get('주소', 2)] or '').strip()
lat, lng = geo_results[addr]
addr = str(row[addr_idx] or '').strip()
if addr:
all_addrs.add(addr)
row_count += 1
logger.info(f' → 근무자 행 수: {row_count}')
else:
name_idx = name_idx if name_idx is not None else 0
if addr_idx is None:
addr_idx = 1
logger.warning(f' [주의] 주소 컬럼을 찾을 수 없어 기본 인덱스 {addr_idx} 사용')
row_count = 0
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[addr_idx] or '').strip()
if addr:
all_addrs.add(addr)
row_count += 1
logger.info(f' → 근무처 행 수: {row_count}')
total_addrs = len(all_addrs)
logger.info(f'수집된 고유 주소 수: {total_addrs}')
GEO_STATS['success'] = 0
GEO_STATS['failed'] = 0
GEO_STATS['failed_addrs'] = []
if VWORLD_API_KEY:
api_name = 'VWorld API'
delay = 0.1
elif KAKAO_API_KEY:
api_name = '카카오 API'
delay = 0.1
else:
api_name = 'Nominatim (API 키 권장)'
logger.info(' (참고: VWorld API 키 발급받으시면 정확도와 속도가 크게 향상됩니다)')
delay = 1.0
logger.info(f'=== 지오코딩 시작 ({api_name}) ===')
update_progress('지오코딩', f'주소를 좌표로 변환 중 ({api_name})', total=total_addrs, current=0)
geo_results = {}
for idx, addr in enumerate(all_addrs, 1):
msg = f'[{idx}/{total_addrs}] {addr}'
logger.info(f' {msg}')
geo_results[addr] = geocode_with_fallback(addr)
if idx % 5 == 0 or idx == total_addrs:
update_progress('지오코딩', f'좌표 변환 중... {idx}/{total_addrs}', total=total_addrs, current=idx)
if idx < total_addrs:
time.sleep(delay)
logger.info(f'=== 지오코딩 완료: 성공 {GEO_STATS["success"]}, 실패 {GEO_STATS["failed"]} ===')
if GEO_STATS['failed_addrs']:
logger.warning(f'실패한 주소 목록 (최대 20개): {GEO_STATS["failed_addrs"]}')
update_progress('완료', f'처리 완료: 근무자 {len(workers or [])}명, 근무처 {len(workplaces or [])}')
upload_progress['active'] = False
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
headers = [str(c.value or '').strip() for c in ws[1]]
hope_idx = find_header_idx(headers, ['희망사항', '희망', '배치희망'])
is_worker_sheet = hope_idx is not None
name_idx = find_header_idx(headers, ['이름', '성명', '명칭', '근무지명', '투표소명'])
addr_idx = find_header_idx(headers, ['주소', '위치', '소재지'])
if is_worker_sheet:
hope_idx = hope_idx if hope_idx is not None else 1
name_idx = name_idx if name_idx is not None else 0
dob_idx = find_header_idx(headers, ['생년월일', '생일']) or 3
phone_idx = find_header_idx(headers, ['연락처', '전화', '휴대폰']) or 4
if addr_idx is None:
addr_idx = 2
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[addr_idx] or '').strip()
lat, lng = geo_results.get(addr, (YONGSAN_CENTER[0] + random.uniform(-0.005, 0.005),
YONGSAN_CENTER[1] + random.uniform(-0.005, 0.005)))
workers.append({
'id': f'w{len(workers)}', 'name': name,
'hope': str(row[idx.get('희망사항', 1)] or '').strip(),
'hope': str(row[hope_idx] or '').strip(),
'address': addr,
'dob': str(row[idx.get('생년월일', 3)] or '').strip(),
'phone': str(row[idx.get('연락처', 4)] or '').strip(),
'dob': str(row[dob_idx] or '').strip(),
'phone': str(row[phone_idx] or '').strip(),
'lat': lat, 'lng': lng,
})
else:
name_idx = name_idx if name_idx is not None else 0
if addr_idx is None:
addr_idx = 1
if '희망사항' not in headers:
idx = {h: i for i, h in enumerate(headers)}
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): continue
name = str(row[idx.get('이름', 0)] or '').strip()
name = str(row[name_idx] or '').strip() if name_idx is not None else ''
if not name: continue
addr = str(row[idx.get('주소', 1)] or '').strip()
lat, lng = geo_results[addr]
addr = str(row[addr_idx] or '').strip()
lat, lng = geo_results.get(addr, (YONGSAN_CENTER[0] + random.uniform(-0.005, 0.005),
YONGSAN_CENTER[1] + random.uniform(-0.005, 0.005)))
workplaces.append({
'id': f'p{len(workplaces)}', 'name': name,
'address': addr, 'lat': lat, 'lng': lng,
})
logger.info(f'=== 업로드 완료: 근무자 {len(workers)}명, 근무처 {len(workplaces)}개 ===')
t['workers'] = workers
t['workplaces'] = workplaces
t['assignments'] = {}
store['filename'] = file.filename
update_progress('완료', f'처리 완료: 근무자 {len(workers)}명, 근무처 {len(workplaces)}')
upload_progress['active'] = False
return jsonify({'workers': workers, 'workplaces': workplaces, 'assignments': {}})