296 lines
9.2 KiB
Python
296 lines
9.2 KiB
Python
from __future__ import annotations
|
||
|
||
import re
|
||
from collections.abc import Sequence
|
||
|
||
import pandas as pd
|
||
|
||
from .classifier import CLASSES, address_classification
|
||
from .utils import any_of_in, create_token, is_valid_token, unfold_house_ranges
|
||
|
||
|
||
class AddressSplitter(Sequence):
|
||
def __init__(self, address: str):
|
||
self.input = address
|
||
|
||
self.addresses = self.split()
|
||
|
||
# Sequence abstract methods implementation
|
||
|
||
def __getitem__(self, key: int):
|
||
if key < len(self.addresses):
|
||
return self.addresses[key]
|
||
else:
|
||
raise IndexError()
|
||
|
||
def __len__(self):
|
||
return len(self.addresses)
|
||
|
||
# Address token class manipulations
|
||
|
||
def next_class(self) -> str:
|
||
return self.token["class"][0]
|
||
|
||
def prev_class(self) -> str:
|
||
return self.accumulator["class"][-1]
|
||
|
||
def correct_order(self) -> bool:
|
||
return (
|
||
len(self.accumulator["class"]) > 0
|
||
and CLASSES.index(self.prev_class()) < CLASSES.index(self.next_class())
|
||
and self.accumulator["class"] != "w"
|
||
)
|
||
|
||
def next_class_is(self, comparing_class: str) -> bool:
|
||
return len(self.token["class"]) > 0 and self.next_class() == comparing_class[0]
|
||
|
||
def has_no_class(self, comparing_class: str) -> bool:
|
||
return comparing_class[0] not in self.accumulator["class"]
|
||
|
||
def pop_token_class(self):
|
||
self.token["class"] = self.token["class"][1:]
|
||
|
||
# Accumulator constrains
|
||
|
||
def next_is_street_or_upper(self) -> bool:
|
||
is_unknown_class = self.accumulator["class"] in ("", "w")
|
||
|
||
return (
|
||
CLASSES.index(self.next_class()) <= CLASSES.index("s") or is_unknown_class
|
||
)
|
||
|
||
def has_numbered_street(self) -> bool:
|
||
return any_of_in(("-я", "-й", "-Я"), self.accumulator["address"])
|
||
|
||
# Accumulator manipulation
|
||
|
||
## House
|
||
|
||
def substitue_house(self) -> str:
|
||
house_regex = re.compile(r"\d{1,4} ?[\/\-]?\d* ?")
|
||
|
||
number = house_regex.findall(self.token["obj"])[0]
|
||
|
||
if self.has_numbered_street():
|
||
house_number_index = 1
|
||
else:
|
||
house_number_index = 0
|
||
|
||
number_in_accumulator = house_regex.findall(self.accumulator["address"])
|
||
|
||
if number_in_accumulator:
|
||
return re.sub(
|
||
number_in_accumulator[house_number_index],
|
||
number,
|
||
self.accumulator["address"],
|
||
)
|
||
else:
|
||
return self.accumulator["address"]
|
||
|
||
## Building
|
||
|
||
def append_building(self, number: int) -> pd.Series:
|
||
self.accumulator["class"] += "b"
|
||
self.accumulator["address"] += "к." + number
|
||
|
||
return self.accumulator
|
||
|
||
def substitue_building(self, number: int) -> str:
|
||
return re.sub(r"\d$", number, self.accumulator["address"])
|
||
|
||
def insert_building(self):
|
||
number = re.findall(r"\d", self.token["obj"])[-1]
|
||
|
||
if number and self.has_no_class("building"):
|
||
self.accumulator = self.append_building(number)
|
||
else:
|
||
self.accumulator["address"] = self.substitue_building(number)
|
||
|
||
## Edifice
|
||
|
||
def substitue_edifice(self, number: int) -> str:
|
||
return re.sub(r"cтр\. ?\d", number, self.accumulator["address"].strip())
|
||
|
||
def insert_edifice(self):
|
||
number = re.findall("стр\.? ?\d", self.token["obj"])[-1]
|
||
|
||
self.accumulator["address"] = self.substitue_edifice(number)
|
||
|
||
if number and self.has_no_class("edifice"):
|
||
self.accumulator["class"] += "e"
|
||
|
||
## Letter
|
||
|
||
def without_letter(self) -> str:
|
||
return re.sub(r"[А-Яа-я]$", "", self.accumulator["address"].strip())
|
||
|
||
def substitue_letter(self, letter: str) -> str:
|
||
address_without_letter = self.without_letter()
|
||
|
||
return address_without_letter + letter
|
||
|
||
def insert_letter(self):
|
||
letter = re.findall(r"[А-Яа-я]", self.token["obj"])[-1]
|
||
self.accumulator["address"] = self.substitue_letter(letter)
|
||
|
||
if letter and self.has_no_class("litera"):
|
||
self.accumulator["class"] += "l"
|
||
|
||
def has_letter_in(self) -> bool:
|
||
return re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", self.accumulator["address"])
|
||
|
||
## Room
|
||
|
||
def substitue_room(self, number: int) -> str:
|
||
return re.sub(
|
||
r"пом\. ?\d\-?\d*\w?", number, self.accumulator["address"].strip()
|
||
)
|
||
|
||
def insert_room(self):
|
||
number = re.findall("пом\. ?\-?\d*\w?", self.token["obj"])[-1]
|
||
self.accumulator["address"] = self.substitue_room(number)
|
||
|
||
if number and self.has_no_class("room"):
|
||
self.accumulator["class"] += "r"
|
||
|
||
# Data preprocessing
|
||
|
||
def split_tokens(self) -> list[pd.Series]:
|
||
address = self.input.replace(";", ",")
|
||
|
||
parts = address.split(",")
|
||
parts = map(str.strip, parts)
|
||
parts = filter(is_valid_token, parts)
|
||
|
||
tokens = map(lambda part: create_token(part, ""), parts)
|
||
|
||
return list(tokens)
|
||
|
||
def cut_address(self) -> pd.Series:
|
||
# fmt: off
|
||
while (
|
||
len(self.accumulator["class"]) > 0
|
||
and CLASSES.index(self.prev_class()) > CLASSES.index(self.next_class())
|
||
):
|
||
# fmt: on
|
||
match self.accumulator["class"][-1]:
|
||
case "h":
|
||
self.accumulator["addresses"] = re.sub(
|
||
r"[мкдтпучасток]*\.? ?\d{1,4} ?\/*\d* ?",
|
||
"",
|
||
self.accumulator["address"].lower(),
|
||
)
|
||
case "b":
|
||
number = re.findall(r"к{0,1}\.? ?\d", self.accumulator["address"])[
|
||
-1
|
||
]
|
||
self.accumulator["address"] = re.sub(
|
||
number, "", self.accumulator["address"]
|
||
)
|
||
case "e":
|
||
self.accumulator["address"] = re.sub(
|
||
r"cтр\.? ?\d", "", self.accumulator["address"]
|
||
)
|
||
case "l":
|
||
self.accumulator["address"] = re.sub(
|
||
r"[литера]*\.? ?[А-Яа-я]{1}$", "", self.accumulator["address"]
|
||
)
|
||
case "r":
|
||
self.accumulator["address"] = re.sub(
|
||
r"пом\.? ?\d+", "", self.accumulator["address"]
|
||
)
|
||
|
||
self.accumulator["class"] = self.accumulator["class"][:-1]
|
||
|
||
return self.accumulator
|
||
|
||
# Splitting
|
||
|
||
def split(self):
|
||
self.tokens = self.split_tokens()
|
||
|
||
result = []
|
||
|
||
self.accumulator = pd.Series({"address": "", "class": ""})
|
||
|
||
prev_token = create_token()
|
||
|
||
for cursor in self.tokens:
|
||
self.token = address_classification(cursor, prev_token)
|
||
prev_token = self.token.copy()
|
||
|
||
if self.accumulator["class"] == "":
|
||
self.accumulator = self.token.rename({"obj": "address"})
|
||
continue
|
||
|
||
if self.correct_order():
|
||
self.accumulator["address"] += " "
|
||
self.accumulator += self.token.rename({"obj": "address"})
|
||
else:
|
||
unfolded_address = unfold_house_ranges(self.accumulator["address"])
|
||
self.accumulator["address"] = unfolded_address[-1]
|
||
|
||
result.extend(unfolded_address)
|
||
|
||
self.accumulator = self.cut_address()
|
||
|
||
if self.next_is_street_or_upper():
|
||
self.accumulator = self.token.rename({"obj": "address"})
|
||
|
||
if self.next_class_is("house"):
|
||
self.accumulator["address"] = self.substitue_house()
|
||
self.pop_token_class()
|
||
|
||
if self.next_class_is("building"):
|
||
self.insert_building()
|
||
self.pop_token_class()
|
||
|
||
if self.next_class_is("edifice"):
|
||
self.insert_edifice()
|
||
self.pop_token_class()
|
||
|
||
if self.next_class_is("letter"):
|
||
self.insert_letter()
|
||
elif self.has_letter_in():
|
||
self.accumulator["address"] = self.without_letter()
|
||
|
||
if self.next_class_is("room"):
|
||
self.insert_room()
|
||
self.pop_token_class()
|
||
|
||
result.extend(unfold_house_ranges(self.accumulator["address"]))
|
||
|
||
return result
|
||
|
||
|
||
def split_pesoch_res(address: str) -> list[str]:
|
||
t = re.sub(r",", " ", address)
|
||
t = re.split(r"(Санкт-Петербург|Ленинградская обл|Л\.О)", t)
|
||
t = list(map(str.strip, filter(lambda token: token != "", t)))
|
||
tokens = [t[i] + " " + t[i + 1] for i in range(0, len(t) - 1, 2)]
|
||
|
||
if tokens:
|
||
return list(set(tokens))
|
||
return [address]
|
||
|
||
|
||
def process_row(row: pd.Series[str]) -> pd.Series[str]:
|
||
row = row.copy()
|
||
|
||
if pd.isnull(row["Улица"]):
|
||
row["Улица"] = [None]
|
||
else:
|
||
if row["РЭС"] == "Песочинский РЭС":
|
||
addresses = split_pesoch_res(row["Улица"])
|
||
else:
|
||
addresses = AddressSplitter(row["Улица"])
|
||
row["Улица"] = addresses
|
||
|
||
return row
|
||
|
||
|
||
def split_addresses(df: pd.DataFrame) -> pd.DataFrame:
|
||
merged_df = df.apply(process_row, axis=1).reset_index()
|
||
|
||
return merged_df.explode("Улица", ignore_index=True)
|