ГАР-ФИАС
Формирование CSV из общего архива
import os
import csv
import zipfile
import logging
import gc
from lxml import etree
# --- КОНФИГУРАЦИЯ ---
ZIP_PATH = r'D:\Projects\GAR\gar_xml.zip'
OUTPUT_FILE = r'D:\Projects\GAR\gar_parser\gar_full_data.csv'
# REGION_CODE = '77' # '77' для Москвы, None для ВСЕХ регионов
REGION_CODE = '77'
# ----------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_file_from_zip(zip_file, pattern):
"""Находит файл в архиве по шаблону."""
for name in zip_file.namelist():
if pattern in name and name.endswith('.XML'):
return name
return None
def parse_house_types(zip_ref):
"""Возвращает словарь {id: name} для типов домов"""
filename = get_file_from_zip(zip_ref, 'AS_HOUSE_TYPES_')
types_map = {}
if filename:
logging.info(f"Загрузка типов домов из {filename}...")
with zip_ref.open(filename) as f:
context = etree.iterparse(f, events=('end',), tag='HOUSETYPE')
for event, elem in context:
types_map[elem.get('ID')] = elem.get('SHORTNAME') or elem.get('NAME')
elem.clear()
return types_map
def parse_addhouse_types(zip_ref):
"""Возвращает словарь {id: name} для дополнительных типов домов (корпус, строение)"""
filename = get_file_from_zip(zip_ref, 'AS_ADDHOUSE_TYPES_')
types_map = {}
if filename:
logging.info(f"Загрузка доп. типов домов из {filename}...")
with zip_ref.open(filename) as f:
context = etree.iterparse(f, events=('end',), tag='HOUSETYPE')
for event, elem in context:
types_map[elem.get('ID')] = elem.get('SHORTNAME') or elem.get('NAME')
elem.clear()
return types_map
def process_region(zip_ref, region, writer, house_types, addhouse_types):
"""Обрабатывает один регион и записывает данные в CSV"""
# 1. Загрузка адресных объектов (Улицы, Города) -> В память
# Нам нужно: OBJECTID -> (OBJECTGUID, NAME, TYPENAME)
addr_file = get_file_from_zip(zip_ref, f'{region}/AS_ADDR_OBJ_')
if not addr_file:
logging.warning(f"Пропуск региона {region}: AS_ADDR_OBJ не найден.")
return
logging.info(f"[{region}] Загрузка адресных объектов...")
addr_map = {} # {id: (guid, name, typename)}
region_name = f"Регион {region}" # Значение по умолчанию
with zip_ref.open(addr_file) as f:
context = etree.iterparse(f, events=('end',), tag='OBJECT')
for event, elem in context:
if elem.get('ISACTUAL') == '1' and elem.get('ISACTIVE') == '1':
# Уровень 1 обычно сам Регион (например "Москва", "Свердловская обл")
if elem.get('LEVEL') == '1':
# Формат: Тип Название (например, "г Москва")
region_name = f"{elem.get('TYPENAME')} {elem.get('NAME')}"
addr_map[elem.get('OBJECTID')] = (
elem.get('OBJECTGUID'),
elem.get('NAME'),
elem.get('TYPENAME')
)
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
logging.info(f"[{region}] Обнаружено название региона: {region_name}")
if not addr_map:
logging.warning(f"[{region}] Активные адресные объекты не найдены.")
return
# 2. Загрузка домов -> В память
# Нам нужно: OBJECTID -> (OBJECTGUID, HOUSENUM, ADDNUM1, ADDNUM2, HOUSETYPE_ID, ADDTYPE1, ADDTYPE2)
house_file = get_file_from_zip(zip_ref, f'{region}/AS_HOUSES_')
if not house_file:
logging.warning(f"Пропуск региона {region}: AS_HOUSES не найден.")
return
logging.info(f"[{region}] Загрузка домов...")
house_map = {} # {id: (guid, num, add1, add2, type_name, addtype1_name, addtype2_name)}
with zip_ref.open(house_file) as f:
context = etree.iterparse(f, events=('end',), tag='HOUSE')
for event, elem in context:
if elem.get('ISACTUAL') == '1' and elem.get('ISACTIVE') == '1':
ht_id = elem.get('HOUSETYPE')
ht_name = house_types.get(ht_id, ht_id)
at1_id = elem.get('ADDTYPE1')
at1_name = addhouse_types.get(at1_id, '')
at2_id = elem.get('ADDTYPE2')
at2_name = addhouse_types.get(at2_id, '')
house_map[elem.get('OBJECTID')] = (
elem.get('OBJECTGUID'),
elem.get('HOUSENUM'),
elem.get('ADDNUM1'),
elem.get('ADDNUM2'),
ht_name,
at1_name,
at2_name
)
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
if not house_map:
logging.info(f"[{region}] Активные дома не найдены.")
del addr_map
return
# 3. Потоковая обработка иерархии и запись в CSV
# Связь: Дом(OBJECTID) -> Улица(PARENTOBJID)
hier_file = get_file_from_zip(zip_ref, f'{region}/AS_ADM_HIERARCHY_')
if not hier_file:
logging.warning(f"Пропуск региона {region}: AS_ADM_HIERARCHY не найден.")
return
logging.info(f"[{region}] Связывание иерархии и запись в CSV...")
count = 0
with zip_ref.open(hier_file) as f:
context = etree.iterparse(f, events=('end',), tag='ITEM')
for event, elem in context:
if elem.get('ISACTIVE') == '1':
obj_id = elem.get('OBJECTID')
parent_id = elem.get('PARENTOBJID')
# Проверяем, является ли это связью Дом -> Улица
if obj_id in house_map and parent_id in addr_map:
house = house_map[obj_id]
street = addr_map[parent_id]
# Строка: RegionCode, RegionName, StreetGUID, StreetType, StreetName, HSGUID, HouseType, HouseNum, AddType1, AddNum1, AddType2, AddNum2
writer.writerow([
region,
region_name,
street[0], # Street GUID
street[2], # Street Type (например, ул)
street[1], # Street Name (например, Ленина)
house[0], # House GUID
house[4], # House Type Name (например, д.)
house[1], # House Num
house[5], # Add Type 1 (например, к.)
house[2], # Add Num 1
house[6], # Add Type 2
house[3] # Add Num 2
])
count += 1
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
logging.info(f"[{region}] Завершено. Сохранено {count} записей.")
# Очистка памяти
del addr_map
del house_map
gc.collect()
def main():
output_dir = os.path.dirname(OUTPUT_FILE)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
logging.info(f"Открытие архива: {ZIP_PATH}")
with zipfile.ZipFile(ZIP_PATH, 'r') as z:
# 1. Загрузка глобальных справочников
house_types = parse_house_types(z)
addhouse_types = parse_addhouse_types(z)
# 2. Определение регионов
if REGION_CODE:
regions = [REGION_CODE]
else:
logging.info("Автоопределение регионов...")
# Ищем папки вида '01/', '77/'
dirs = set(name.split('/')[0] for name in z.namelist() if name[0].isdigit())
regions = sorted([d for d in dirs if d.isdigit()])
logging.info(f"Найдено {len(regions)} регионов.")
# 3. Открытие CSV на запись
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# Заголовок
writer.writerow([
'region_code',
'region_name',
'aoguid',
'street_type',
'street_name',
'hsguid',
'housetype',
'housenum',
'addtype1',
'addnum1',
'addtype2',
'addnum2'
])
# 4. Обработка
for reg in regions:
try:
process_region(z, reg, writer, house_types, addhouse_types)
except Exception as e:
logging.error(f"Ошибка обработки региона {reg}: {e}")
logging.info(f"Всё готово. Файл сохранен: {OUTPUT_FILE}")
if __name__ == '__main__':
main()
Для загрузки в БД и обновления по дельтам
setup_db.sql
-- 1. Создание сырых таблиц (для хранения "как есть" из XML)
CREATE TABLE IF NOT EXISTS gar_raw_addr_objs (
objectid BIGINT PRIMARY KEY,
objectguid UUID,
name VARCHAR(255),
typename VARCHAR(50),
level INT,
region_code VARCHAR(10)
);
CREATE TABLE IF NOT EXISTS gar_raw_houses (
objectid BIGINT PRIMARY KEY,
objectguid UUID,
housenum VARCHAR(50),
addnum1 VARCHAR(50),
addnum2 VARCHAR(50),
housetype VARCHAR(50),
addtype1 VARCHAR(50),
addtype2 VARCHAR(50),
region_code VARCHAR(10)
);
CREATE TABLE IF NOT EXISTS gar_raw_hierarchy (
objectid BIGINT,
parentobjid BIGINT,
region_code VARCHAR(10),
PRIMARY KEY (objectid, parentobjid)
);
CREATE INDEX IF NOT EXISTS idx_raw_hierarchy_parent ON gar_raw_hierarchy(parentobjid);
CREATE INDEX IF NOT EXISTS idx_raw_hierarchy_object ON gar_raw_hierarchy(objectid);
-- 2. Создание итоговой плоской таблицы (для поиска)
CREATE TABLE IF NOT EXISTS gar_addresses (
region_code VARCHAR(10),
region_name VARCHAR(255),
street_aoguid UUID,
street_type VARCHAR(50),
street_name VARCHAR(255),
house_guid UUID,
house_type VARCHAR(50),
house_num VARCHAR(50),
add_type1 VARCHAR(50),
add_num1 VARCHAR(50),
add_type2 VARCHAR(50),
add_num2 VARCHAR(50)
);
CREATE INDEX IF NOT EXISTS idx_gar_addr_street_guid ON gar_addresses(street_aoguid);
CREATE INDEX IF NOT EXISTS idx_gar_addr_house_guid ON gar_addresses(house_guid);
CREATE INDEX IF NOT EXISTS idx_gar_addr_region ON gar_addresses(region_code);
-- 3. Процедура полной пересборки плоской таблицы из сырых данных
-- Вызывается после первичной загрузки или после наката дельты
CREATE OR REPLACE PROCEDURE rebuild_gar_flat_table()
LANGUAGE plpgsql
AS $$
BEGIN
RAISE NOTICE 'Очистка таблицы gar_addresses...';
TRUNCATE TABLE gar_addresses;
RAISE NOTICE 'Сборка плоской таблицы...';
INSERT INTO gar_addresses (
region_code,
region_name,
street_aoguid,
street_type,
street_name,
house_guid,
house_type,
house_num,
add_type1,
add_num1,
add_type2,
add_num2
)
SELECT
h.region_code,
(r.typename || ' ' || r.name) as region_name, -- Подтягиваем имя региона (уровень 1)
s.objectguid as street_guid,
s.typename as street_type,
s.name as street_name,
h.objectguid as house_guid,
h.housetype,
h.housenum,
h.addtype1,
h.addnum1,
h.addtype2,
h.addnum2
FROM gar_raw_houses h
-- Связь Дом -> Улица
JOIN gar_raw_hierarchy link ON h.objectid = link.objectid
JOIN gar_raw_addr_objs s ON link.parentobjid = s.objectid
-- Ищем имя региона (это может быть медленно, можно оптимизировать)
LEFT JOIN gar_raw_addr_objs r ON r.level = 1 AND r.region_code = h.region_code
WHERE s.level > 1; -- Исключаем сам регион как улицу
RAISE NOTICE 'Обновление Materialized View mv_gar_streets...';
REFRESH MATERIALIZED VIEW mv_gar_streets;
RAISE NOTICE 'Пересборка завершена.';
END;
$$;
-- 4. Включаем расширение для быстрого поиска текста (Trigrams)
CREATE EXTENSION IF NOT EXISTS pg_trgm;
-- 5. Создаем уникальный индекс улиц для автодополнения (Autocomplete)
-- Использование Materialized View или отдельной таблицы намного быстрее, чем SELECT DISTINCT по большой таблице
DROP MATERIALIZED VIEW IF EXISTS mv_gar_streets;
CREATE MATERIALIZED VIEW mv_gar_streets AS
SELECT DISTINCT
street_aoguid,
street_name,
street_type,
region_code,
region_name,
(street_type || ' ' || street_name || ', ' || region_name) as full_name
FROM gar_addresses;
CREATE INDEX idx_mv_streets_trgm ON mv_gar_streets USING gin (street_name gin_trgm_ops);
CREATE INDEX idx_mv_streets_guid ON mv_gar_streets (street_aoguid);
-- 6. Функция для автодополнения улиц (Ввод: "Ленин")
-- Возвращает: Список улиц
DROP FUNCTION IF EXISTS api_suggest_streets(text, text);
CREATE OR REPLACE FUNCTION api_suggest_streets(search_text text, p_region_code text DEFAULT NULL)
RETURNS TABLE (
aoguid uuid,
name varchar,
typename varchar,
region varchar,
full_title text
) AS $$
BEGIN
RETURN QUERY
SELECT
s.street_aoguid,
s.street_name,
s.street_type,
s.region_name,
s.full_name
FROM mv_gar_streets s
WHERE (s.street_name ILIKE search_text || '%' -- Поиск по началу (Быстро)
OR s.street_name ILIKE '%' || search_text || '%') -- Нечеткий поиск (Медленнее, использует trgm)
AND (p_region_code IS NULL OR s.region_code = p_region_code)
LIMIT 20;
END;
$$ LANGUAGE plpgsql;
-- 7. Функция для получения UUID дома по GUID улицы + Номеру дома
-- Собирает полный адрес (д. 10 к. 2 стр. 1)
CREATE OR REPLACE FUNCTION api_get_house_guids(p_street_guid uuid, p_house_num text)
RETURNS TABLE (
house_guid uuid,
full_number text,
type_name text
) AS $$
BEGIN
RETURN QUERY
SELECT
h.house_guid,
(
COALESCE(h.house_num, '') ||
CASE WHEN h.add_num1 IS NOT NULL THEN ' ' || COALESCE(h.add_type1, '') || h.add_num1 ELSE '' END ||
CASE WHEN h.add_num2 IS NOT NULL THEN ' ' || COALESCE(h.add_type2, '') || h.add_num2 ELSE '' END
) as num,
h.house_type
FROM gar_addresses h
WHERE h.street_aoguid = p_street_guid
AND h.house_num = p_house_num;
END;
$$ LANGUAGE plpgsql;
0_init_db.py
import psycopg2
import logging
import os
# --- КОНФИГУРАЦИЯ ---
DB_CONFIG = {
'dbname': 'gar',
'user': 'andedali',
'password': 'pass',
'host': 'localhost',
'port': '5432'
}
SQL_FILE = r'D:\Projects\GAR\gar_parser\setup_db.sql'
# ----------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def main():
if not os.path.exists(SQL_FILE):
logging.error(f"Файл SQL не найден: {SQL_FILE}")
return
try:
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = True
cur = conn.cursor()
logging.info("Чтение и выполнение SQL скрипта создания таблиц и функций...")
with open(SQL_FILE, 'r', encoding='utf-8') as f:
sql = f.read()
cur.execute(sql)
logging.info("Таблицы и функции успешно созданы.")
conn.close()
except Exception as e:
logging.error(f"Ошибка при создании таблиц и функций: {e}")
if __name__ == '__main__':
main()
gar_to_raw_csvs.py
import os
import csv
import zipfile
import logging
import gc
from lxml import etree
# --- КОНФИГУРАЦИЯ ---
ZIP_PATH = r'D:\Projects\GAR\gar_xml.zip'
OUTPUT_DIR = r'D:\Projects\GAR\gar_parser\raw_csv_output'
REGION_CODE = '77' # '77' для Москвы, None для ВСЕХ регионов
# ----------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_file_from_zip(zip_file, pattern):
for name in zip_file.namelist():
if pattern in name and name.endswith('.XML'):
return name
return None
def parse_house_types(zip_ref):
"""Словарь {id: name} для типов домов"""
filename = get_file_from_zip(zip_ref, 'AS_HOUSE_TYPES_')
types_map = {}
if filename:
logging.info(f"Загрузка типов домов из {filename}...")
with zip_ref.open(filename) as f:
context = etree.iterparse(f, events=('end',), tag='HOUSETYPE')
for event, elem in context:
types_map[elem.get('ID')] = elem.get('SHORTNAME') or elem.get('NAME')
elem.clear()
return types_map
def parse_addhouse_types(zip_ref):
"""Словарь {id: name} для доп. типов"""
filename = get_file_from_zip(zip_ref, 'AS_ADDHOUSE_TYPES_')
types_map = {}
if filename:
logging.info(f"Загрузка доп. типов домов из {filename}...")
with zip_ref.open(filename) as f:
context = etree.iterparse(f, events=('end',), tag='HOUSETYPE')
for event, elem in context:
types_map[elem.get('ID')] = elem.get('SHORTNAME') or elem.get('NAME')
elem.clear()
return types_map
def process_region(zip_ref, region, writers, house_types, addhouse_types):
# 1. Address Objects (Улицы, Города)
addr_file = get_file_from_zip(zip_ref, f'{region}/AS_ADDR_OBJ_')
if addr_file:
logging.info(f"[{region}] Парсинг адресных объектов...")
with zip_ref.open(addr_file) as f:
context = etree.iterparse(f, events=('end',), tag='OBJECT')
for event, elem in context:
if elem.get('ISACTUAL') == '1' and elem.get('ISACTIVE') == '1':
writers['addr_objs'].writerow([
elem.get('OBJECTID'),
elem.get('OBJECTGUID'),
elem.get('NAME'),
elem.get('TYPENAME'),
elem.get('LEVEL'),
region # Добавляем код региона для удобства
])
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
# 2. Houses (Дома)
house_file = get_file_from_zip(zip_ref, f'{region}/AS_HOUSES_')
if house_file:
logging.info(f"[{region}] Парсинг домов...")
with zip_ref.open(house_file) as f:
context = etree.iterparse(f, events=('end',), tag='HOUSE')
for event, elem in context:
if elem.get('ISACTUAL') == '1' and elem.get('ISACTIVE') == '1':
ht_name = house_types.get(elem.get('HOUSETYPE'), elem.get('HOUSETYPE'))
at1_name = addhouse_types.get(elem.get('ADDTYPE1'), '')
at2_name = addhouse_types.get(elem.get('ADDTYPE2'), '')
writers['houses'].writerow([
elem.get('OBJECTID'),
elem.get('OBJECTGUID'),
elem.get('HOUSENUM'),
elem.get('ADDNUM1'),
elem.get('ADDNUM2'),
ht_name,
at1_name,
at2_name,
region
])
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
# 3. Hierarchy (Связи)
hier_file = get_file_from_zip(zip_ref, f'{region}/AS_ADM_HIERARCHY_')
if hier_file:
logging.info(f"[{region}] Парсинг иерархии...")
with zip_ref.open(hier_file) as f:
context = etree.iterparse(f, events=('end',), tag='ITEM')
for event, elem in context:
if elem.get('ISACTIVE') == '1':
writers['hierarchy'].writerow([
elem.get('OBJECTID'),
elem.get('PARENTOBJID'),
region
])
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
def main():
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# Открываем 3 CSV файла на запись
files = {
'addr_objs': open(os.path.join(OUTPUT_DIR, 'raw_addr_objs.csv'), 'w', newline='', encoding='utf-8'),
'houses': open(os.path.join(OUTPUT_DIR, 'raw_houses.csv'), 'w', newline='', encoding='utf-8'),
'hierarchy': open(os.path.join(OUTPUT_DIR, 'raw_hierarchy.csv'), 'w', newline='', encoding='utf-8')
}
writers = {
'addr_objs': csv.writer(files['addr_objs']),
'houses': csv.writer(files['houses']),
'hierarchy': csv.writer(files['hierarchy'])
}
# Заголовки
writers['addr_objs'].writerow(['objectid', 'objectguid', 'name', 'typename', 'level', 'region_code'])
writers['houses'].writerow(['objectid', 'objectguid', 'housenum', 'addnum1', 'addnum2', 'housetype', 'addtype1', 'addtype2', 'region_code'])
writers['hierarchy'].writerow(['objectid', 'parentobjid', 'region_code'])
logging.info(f"Открытие архива: {ZIP_PATH}")
with zipfile.ZipFile(ZIP_PATH, 'r') as z:
house_types = parse_house_types(z)
addhouse_types = parse_addhouse_types(z)
if REGION_CODE:
regions = [REGION_CODE]
else:
dirs = set(name.split('/')[0] for name in z.namelist() if name[0].isdigit())
regions = sorted([d for d in dirs if d.isdigit()])
for reg in regions:
try:
process_region(z, reg, writers, house_types, addhouse_types)
except Exception as e:
logging.error(f"Ошибка в регионе {reg}: {e}")
# Закрываем файлы
for f in files.values():
f.close()
logging.info("Парсинг завершен. Сырые CSV файлы созданы.")
if __name__ == '__main__':
main()
gar_load_raw.py
import os
import psycopg2
import logging
import time
# --- КОНФИГУРАЦИЯ ---
DB_CONFIG = {
'dbname': 'gar',
'user': 'andedali',
'password': 'pass',
'host': 'localhost',
'port': '5432'
}
CSV_DIR = r'D:\Projects\GAR\gar_parser\raw_csv_output'
# ----------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def load_csv(cursor, table, filename):
path = os.path.join(CSV_DIR, filename)
if not os.path.exists(path):
logging.warning(f"Файл {path} не найден.")
return
logging.info(f"Загрузка {table}...")
with open(path, 'r', encoding='utf-8') as f:
next(f) # Skip header
cursor.copy_expert(f"COPY {table} FROM STDIN WITH CSV DELIMITER ',' NULL ''", f)
def main():
try:
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = True
cur = conn.cursor()
# 1. Загрузка сырых данных
start = time.time()
load_csv(cur, 'gar_raw_addr_objs', 'raw_addr_objs.csv')
load_csv(cur, 'gar_raw_houses', 'raw_houses.csv')
load_csv(cur, 'gar_raw_hierarchy', 'raw_hierarchy.csv')
logging.info(f"Загрузка завершена за {time.time() - start:.2f} сек.")
# 2. Сборка плоской таблицы
logging.info("Запуск процедуры сборки плоской таблицы...")
cur.execute("CALL rebuild_gar_flat_table();")
logging.info("Плоская таблица собрана.")
cur.execute("VACUUM ANALYZE;")
conn.close()
except Exception as e:
logging.error(f"Ошибка БД: {e}")
if __name__ == '__main__':
main()
gar_apply_delta.py
import os
import psycopg2
import logging
import zipfile
from lxml import etree
# --- КОНФИГУРАЦИЯ ---
DELTA_ZIP = r'D:\Projects\GAR\gar_delta_xml.zip'
REGION_CODE = '77'
DB_CONFIG = {
'dbname': 'gar',
'user': 'andedali',
'password': 'pass',
'host': 'localhost',
'port': '5432'
}
# ----------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_file_from_zip(zip_file, pattern):
for name in zip_file.namelist():
if pattern in name and name.endswith('.XML'):
return name
return None
def process_delta_houses(cur, zip_ref, region):
filename = get_file_from_zip(zip_ref, f'{region}/AS_HOUSES_')
if not filename: return
logging.info(f"Обработка дельты домов: {filename}")
stats = {'inserted': 0, 'deleted': 0}
with zip_ref.open(filename) as f:
context = etree.iterparse(f, events=('end',), tag='HOUSE')
for event, elem in context:
obj_id = int(elem.get('OBJECTID'))
is_active = elem.get('ISACTIVE') == '1'
if not is_active:
# Удаление (или деактивация)
cur.execute("DELETE FROM gar_raw_houses WHERE objectid = %s", (obj_id,))
stats['deleted'] += 1
else:
# Вставка / Обновление (UPSERT)
cur.execute("DELETE FROM gar_raw_houses WHERE objectid = %s", (obj_id,))
cur.execute("""
INSERT INTO gar_raw_houses (objectid, objectguid, housenum, addnum1, addnum2, housetype, addtype1, addtype2, region_code)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
obj_id,
elem.get('OBJECTGUID'),
elem.get('HOUSENUM'),
elem.get('ADDNUM1'),
elem.get('ADDNUM2'),
elem.get('HOUSETYPE'),
elem.get('ADDTYPE1'),
elem.get('ADDTYPE2'),
region
))
stats['inserted'] += 1
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
logging.info(f"Дома: добавлено/обновлено: {stats['inserted']}, удалено: {stats['deleted']}")
def process_delta_hierarchy(cur, zip_ref, region):
filename = get_file_from_zip(zip_ref, f'{region}/AS_ADM_HIERARCHY_')
if not filename: return
logging.info(f"Обработка дельты иерархии: {filename}")
stats = {'inserted': 0, 'deleted': 0}
with zip_ref.open(filename) as f:
context = etree.iterparse(f, events=('end',), tag='ITEM')
for event, elem in context:
obj_id = int(elem.get('OBJECTID'))
parent_id = int(elem.get('PARENTOBJID'))
is_active = elem.get('ISACTIVE') == '1'
if not is_active:
cur.execute("DELETE FROM gar_raw_hierarchy WHERE objectid = %s AND parentobjid = %s", (obj_id, parent_id))
stats['deleted'] += 1
else:
# В иерархии PK составной (objectid, parentobjid), поэтому удаляем конкретную связку
cur.execute("DELETE FROM gar_raw_hierarchy WHERE objectid = %s AND parentobjid = %s", (obj_id, parent_id))
cur.execute("""
INSERT INTO gar_raw_hierarchy (objectid, parentobjid, region_code)
VALUES (%s, %s, %s)
""", (obj_id, parent_id, region))
stats['inserted'] += 1
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
logging.info(f"Иерархия: добавлено/обновлено: {stats['inserted']}, удалено: {stats['deleted']}")
def process_delta_addrobjs(cur, zip_ref, region):
filename = get_file_from_zip(zip_ref, f'{region}/AS_ADDR_OBJ_')
if not filename: return
logging.info(f"Обработка дельты адресных объектов: {filename}")
stats = {'inserted': 0, 'deleted': 0}
with zip_ref.open(filename) as f:
context = etree.iterparse(f, events=('end',), tag='OBJECT')
for event, elem in context:
obj_id = int(elem.get('OBJECTID'))
is_active = elem.get('ISACTIVE') == '1'
if not is_active:
cur.execute("DELETE FROM gar_raw_addr_objs WHERE objectid = %s", (obj_id,))
stats['deleted'] += 1
else:
cur.execute("DELETE FROM gar_raw_addr_objs WHERE objectid = %s", (obj_id,))
cur.execute("""
INSERT INTO gar_raw_addr_objs (objectid, objectguid, name, typename, level, region_code)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
obj_id,
elem.get('OBJECTGUID'),
elem.get('NAME'),
elem.get('TYPENAME'),
elem.get('LEVEL'),
region
))
stats['inserted'] += 1
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
logging.info(f"Адреса: добавлено/обновлено: {stats['inserted']}, удалено: {stats['deleted']}")
def main():
try:
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = True
cur = conn.cursor()
logging.info(f"Открытие дельты: {DELTA_ZIP}")
with zipfile.ZipFile(DELTA_ZIP, 'r') as z:
# Тут можно добавить загрузку справочников типов для декодирования
# Обработка домов
process_delta_houses(cur, z, REGION_CODE)
# Обработка иерархии
process_delta_hierarchy(cur, z, REGION_CODE)
# Обработка адресных объектов
process_delta_addrobjs(cur, z, REGION_CODE)
logging.info("Дельта применена к сырым таблицам.")
logging.info("Пересборка плоской таблицы...")
cur.execute("CALL rebuild_gar_flat_table();")
logging.info("Готово.")
except Exception as e:
logging.error(f"Ошибка: {e}")
if __name__ == '__main__':
main()
SQL проверка количества строк и размера таблиц
SELECT
pt.schemaname,
pt.tablename,
COALESCE(ps.n_live_tup, 0) AS row_count,
pg_size_pretty(pg_relation_size(pt.schemaname||'.'||pt.tablename)) AS data_size,
pg_size_pretty(pg_indexes_size(pt.schemaname||'.'||pt.tablename)) AS indexes_size,
pg_size_pretty(pg_total_relation_size(pt.schemaname||'.'||pt.tablename)) AS total_size,
pg_relation_size(pt.schemaname||'.'||pt.tablename) AS data_size_bytes,
pg_indexes_size(pt.schemaname||'.'||pt.tablename) AS indexes_size_bytes,
pg_total_relation_size(pt.schemaname||'.'||pt.tablename) AS total_size_bytes,
ROUND(100.0 * pg_indexes_size(pt.schemaname||'.'||pt.tablename) / NULLIF(pg_total_relation_size(pt.schemaname||'.'||pt.tablename), 0), 2) AS indexes_percent
FROM pg_tables pt
LEFT JOIN pg_stat_user_tables ps ON ps.schemaname = pt.schemaname AND ps.relname = pt.tablename
WHERE pt.schemaname = 'public'
AND pt.tablename LIKE 'gar_%'
ORDER BY pg_total_relation_size(pt.schemaname||'.'||pt.tablename) DESC;
Комментарии
Комментариев пока нет.