from __future__ import annotations import re from collections.abc import Sequence import pandas as pd from .classifier import CLASSES, address_classification from .utils import any_of_in, create_token, is_valid_token, unfold_house_ranges class AddressSplitter(Sequence): def __init__(self, address: str): self.input = address self.addresses = self.split() # Sequence abstract methods implementation def __getitem__(self, key: int): if key < len(self.addresses): return self.addresses[key] else: raise IndexError() def __len__(self): return len(self.addresses) # Address token class manipulations def next_class(self) -> str: return self.token["class"][0] def prev_class(self) -> str: return self.accumulator["class"][-1] def correct_order(self) -> bool: return ( len(self.accumulator["class"]) > 0 and CLASSES.index(self.prev_class()) < CLASSES.index(self.next_class()) and self.accumulator["class"] != "w" ) def next_class_is(self, comparing_class: str) -> bool: return len(self.token["class"]) > 0 and self.next_class() == comparing_class[0] def has_no_class(self, comparing_class: str) -> bool: return comparing_class[0] not in self.accumulator["class"] def pop_token_class(self): self.token["class"] = self.token["class"][1:] # Accumulator constrains def next_is_street_or_upper(self) -> bool: is_unknown_class = self.accumulator["class"] in ("", "w") return ( CLASSES.index(self.next_class()) <= CLASSES.index("s") or is_unknown_class ) def has_numbered_street(self) -> bool: return any_of_in(("-я", "-й", "-Я"), self.accumulator["address"]) # Accumulator manipulation ## House def substitue_house(self) -> str: house_regex = re.compile(r"\d{1,4} ?[\/\-]?\d* ?") number = house_regex.findall(self.token["obj"])[0] if self.has_numbered_street(): house_number_index = 1 else: house_number_index = 0 number_in_accumulator = house_regex.findall(self.accumulator["address"]) if number_in_accumulator: return re.sub( number_in_accumulator[house_number_index], number, self.accumulator["address"], ) else: return self.accumulator["address"] ## Building def append_building(self, number: int) -> pd.Series: self.accumulator["class"] += "b" self.accumulator["address"] += "к." + number return self.accumulator def substitue_building(self, number: int) -> str: return re.sub(r"\d$", number, self.accumulator["address"]) def insert_building(self): number = re.findall(r"\d", self.token["obj"])[-1] if number and self.has_no_class("building"): self.accumulator = self.append_building(number) else: self.accumulator["address"] = self.substitue_building(number) ## Edifice def substitue_edifice(self, number: int) -> str: return re.sub(r"cтр\. ?\d", number, self.accumulator["address"].strip()) def insert_edifice(self): number = re.findall("стр\.? ?\d", self.token["obj"])[-1] self.accumulator["address"] = self.substitue_edifice(number) if number and self.has_no_class("edifice"): self.accumulator["class"] += "e" ## Letter def without_letter(self) -> str: return re.sub(r"[А-Яа-я]$", "", self.accumulator["address"].strip()) def substitue_letter(self, letter: str) -> str: address_without_letter = self.without_letter() return address_without_letter + letter def insert_letter(self): letter = re.findall(r"[А-Яа-я]", self.token["obj"])[-1] self.accumulator["address"] = self.substitue_letter(letter) if letter and self.has_no_class("litera"): self.accumulator["class"] += "l" def has_letter_in(self) -> bool: return re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", self.accumulator["address"]) ## Room def substitue_room(self, number: int) -> str: return re.sub( r"пом\. ?\d\-?\d*\w?", number, self.accumulator["address"].strip() ) def insert_room(self): number = re.findall("пом\. ?\-?\d*\w?", self.token["obj"])[-1] self.accumulator["address"] = self.substitue_room(number) if number and self.has_no_class("room"): self.accumulator["class"] += "r" # Data preprocessing def split_tokens(self) -> list[pd.Series]: address = self.input.replace(";", ",") parts = address.split(",") parts = map(str.strip, parts) parts = filter(is_valid_token, parts) tokens = map(lambda part: create_token(part, ""), parts) return list(tokens) def cut_address(self) -> pd.Series: while len(self.accumulator["class"]) > 0 and CLASSES.index( self.prev_class() ) > CLASSES.index(self.next_class()): match self.accumulator["class"][-1]: case "h": self.accumulator["addresses"] = re.sub( r"[мкдтпучасток]*\.? ?\d{1,4} ?\/*\d* ?", "", self.accumulator["address"].lower(), ) case "b": number = re.findall(r"к{0,1}\.? ?\d", self.accumulator["address"])[ -1 ] self.accumulator["address"] = re.sub( number, "", self.accumulator["address"] ) case "e": self.accumulator["address"] = re.sub( r"cтр\.? ?\d", "", self.accumulator["address"] ) case "l": self.accumulator["address"] = re.sub( r"[литера]*\.? ?[А-Яа-я]{1}$", "", self.accumulator["address"] ) case "r": self.accumulator["address"] = re.sub( r"пом\.? ?\d+", "", self.accumulator["address"] ) self.accumulator["class"] = self.accumulator["class"][:-1] return self.accumulator # Splitting def split(self): self.tokens = self.split_tokens() result = [] self.accumulator = pd.Series({"address": "", "class": ""}) prev_token = create_token() for cursor in self.tokens: self.token = address_classification(cursor, prev_token) prev_token = self.token.copy() if self.accumulator["class"] == "": self.accumulator = self.token.rename({"obj": "address"}) continue if self.correct_order(): self.accumulator["address"] += " " self.accumulator += self.token.rename({"obj": "address"}) else: unfolded_address = unfold_house_ranges(self.accumulator["address"]) self.accumulator["address"] = unfolded_address[-1] result.extend(unfolded_address) self.accumulator = self.cut_address() if self.next_is_street_or_upper(): self.accumulator = self.token.rename({"obj": "address"}) if self.next_class_is("house"): self.accumulator["address"] = self.substitue_house() self.pop_token_class() if self.next_class_is("building"): self.insert_building() self.pop_token_class() if self.next_class_is("edifice"): self.insert_edifice() self.pop_token_class() if self.next_class_is("letter"): self.insert_letter() elif self.has_letter_in(): self.accumulator["address"] = self.without_letter() if self.next_class_is("room"): self.insert_room() self.pop_token_class() result.extend(unfold_house_ranges(self.accumulator["address"])) return result def split_pesoch_res(address: str) -> list[str]: t = re.sub(r",", " ", address) t = re.split(r"(Санкт-Петербург|Ленинградская обл|Л\.О)", t) t = list(map(str.strip, filter(lambda token: token != "", t))) tokens = [t[i] + " " + t[i + 1] for i in range(0, len(t) - 1, 2)] if tokens: return list(set(tokens)) return [address] def process_row(row: pd.Series[str]) -> pd.Series[str]: row = row.copy() if pd.isnull(row["Улица"]): row["Улица"] = [None] else: if row["РЭС"] == "Песочинский РЭС": addresses = split_pesoch_res(row["Улица"]) else: addresses = AddressSplitter(row["Улица"]) row["Улица"] = addresses return row def split_addresses(df: pd.DataFrame) -> pd.DataFrame: merged_df = df.apply(process_row, axis=1).reset_index() return merged_df.explode("Улица", ignore_index=True)