diff --git a/parser/__main__.py b/parser/__main__.py index 5616d83..b9de621 100644 --- a/parser/__main__.py +++ b/parser/__main__.py @@ -3,7 +3,7 @@ import time import schedule -from . import pipeline, LenenergoParser +from . import LenenergoParser, pipeline def job(): diff --git a/parser/address.py b/parser/address.py index 28769f7..d9111dd 100644 --- a/parser/address.py +++ b/parser/address.py @@ -1,28 +1,90 @@ from __future__ import annotations import re -from typing import Iterable, List, TypeVar from collections.abc import Sequence +from typing import Iterable, List, TypeVar import pandas as pd T = TypeVar("T") CLASSES = ("w", "d", "c", "t", "s", "h", "b", "e", "l", "r") -DISTRICTS_PREFIXES = ("мо ", "р-н","городское","лесхоз") +DISTRICTS_PREFIXES = ("мо ", "р-н", "городское", "лесхоз") COUNTRYSIDE_PREFIXES = ( - "г", "п", "д", "гп", "рп", "кп", "пгт", "c", "хутор", " урочище") + "г", + "п", + "д", + "гп", + "рп", + "кп", + "пгт", + "c", + "хутор", + " урочище", +) TERRITORY_PREFIXES = ( -"тер.", " тер", "снт ", "ст ", "дск ", "днп ", "дпк ", "нп ", "пдк ", "т/б ", "садоводство", "массив", "хозя", "сад-во") + "тер.", + " тер", + "снт ", + "ст ", + "дск ", + "днп ", + "дпк ", + "нп ", + "пдк ", + "т/б ", + "садоводство", + "массив", + "хозя", + "сад-во", +) STREET_PREFIXES = ( - " ул", " бул", " пр", " ш", " пер", " дор", " маг", " наб", " пл", " просп", " туп", "шоссе", "лини", "аллея", - "мост", " парк", "кольцо", "проезд", "съезд","переулок", - "ул.", "бул.", "пр.", "ш.", "пер.", "дор.", "маг.", "наб.", "пл.", "просп.", "туп.") -HOUSES_PREFIXES = ("д.", "уч.", "участок", "мкд", "тп","дом","дома") -BUILDING_PREFIXES = ("к.", "к ","корп", "корпус") -EDIFICE_PREFIXES=("стр.", "строение") + " ул", + " бул", + " пр", + " ш", + " пер", + " дор", + " маг", + " наб", + " пл", + " просп", + " туп", + "шоссе", + "лини", + "аллея", + "мост", + " парк", + "кольцо", + "проезд", + "съезд", + "переулок", + "ул.", + "бул.", + "пр.", + "ш.", + "пер.", + "дор.", + "маг.", + "наб.", + "пл.", + "просп.", + "туп.", +) +HOUSES_PREFIXES = ("д.", "уч.", "участок", "мкд", "тп", "дом", "дома") +BUILDING_PREFIXES = ("к.", "к ", "корп", "корпус") +EDIFICE_PREFIXES = ("стр.", "строение") LETTER = ("лит.", "литера", " л.") -PREFIXES = (DISTRICTS_PREFIXES, COUNTRYSIDE_PREFIXES, TERRITORY_PREFIXES, STREET_PREFIXES, HOUSES_PREFIXES, BUILDING_PREFIXES, EDIFICE_PREFIXES,LETTER) +PREFIXES = ( + DISTRICTS_PREFIXES, + COUNTRYSIDE_PREFIXES, + TERRITORY_PREFIXES, + STREET_PREFIXES, + HOUSES_PREFIXES, + BUILDING_PREFIXES, + EDIFICE_PREFIXES, + LETTER, +) def unfold_house_ranges(token: str) -> List[str]: @@ -33,13 +95,17 @@ def unfold_house_ranges(token: str) -> List[str]: a, b = int(a), int(b) if b > a: - addresses += [re.sub(r"([\d]+-[\d]+)", number, token) for number in map(str, range(a, b + 1))] + addresses += [ + re.sub(r"([\d]+-[\d]+)", number, token) + for number in map(str, range(a, b + 1)) + ] else: token = token.replace("-", "/") if not addresses: addresses.append(token) return addresses + def any_of_in(substrings: Iterable[str], string: str) -> bool: return any(map(lambda substring: substring in string, substrings)) @@ -49,7 +115,7 @@ def flatten(arr: Iterable[List[T]]) -> List[T]: def find_room(token: pd.Series, pre_token: pd.Series) -> str: - if re.search(r"\bпом\.?", token['obj']): + if re.search(r"\bпом\.?", token["obj"]): return "r" return "" @@ -57,80 +123,109 @@ def find_room(token: pd.Series, pre_token: pd.Series) -> str: def find_litera(token: pd.Series, pre_token: pd.Series) -> str: if find_room(token, pre_token): return "" - if any_of_in(LETTER, token['obj'].lower()) \ - or re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", token['obj']): + if any_of_in(LETTER, token["obj"].lower()) or re.search( + r"\d{1,3}([А-Я]|[а-я])( |$)", token["obj"] + ): return "l" - if (re.search(r"\b([А-Я]|[а-я]){1}$", token['obj']) \ - and ("l" in pre_token['class'] or "h" in pre_token['class'])) \ - and (" ш" not in token["obj"]) \ - and not find_countryside(token, pre_token): + if ( + ( + re.search(r"\b([А-Я]|[а-я]){1}$", token["obj"]) + and ("l" in pre_token["class"] or "h" in pre_token["class"]) + ) + and (" ш" not in token["obj"]) + and not find_countryside(token, pre_token) + ): return "l" return "" + + def find_edifice(token: pd.Series, pre_token: pd.Series) -> str: - if any_of_in(EDIFICE_PREFIXES, token['obj'].lower()): + if any_of_in(EDIFICE_PREFIXES, token["obj"].lower()): return "e" return "" + def find_building(token: pd.Series, pre_token: pd.Series) -> str: - if re.search(r"\d", token['obj']) and not find_room(token,pre_token): - if any_of_in(BUILDING_PREFIXES, token['obj'].lower()) \ - or "b" in pre_token['class'] and ("h" not in token['class']) and not find_edifice(token,pre_token)\ - or re.search(r"к\.* ?\d", token['obj']): + if re.search(r"\d", token["obj"]) and not find_room(token, pre_token): + if ( + any_of_in(BUILDING_PREFIXES, token["obj"].lower()) + or "b" in pre_token["class"] + and ("h" not in token["class"]) + and not find_edifice(token, pre_token) + or re.search(r"к\.* ?\d", token["obj"]) + ): return "b" return "" def find_house(token: pd.Series, pre_token: pd.Series) -> str: - if re.search(r"\d{1,4}", token['obj']) and not find_room(token,pre_token): - if any_of_in(HOUSES_PREFIXES, token['obj'].lower()): + if re.search(r"\d{1,4}", token["obj"]) and not find_room(token, pre_token): + if any_of_in(HOUSES_PREFIXES, token["obj"].lower()): return "h" - if re.search(r"(д|д\.) ?\d{1,4} ?\/*\d* ?", token['obj']): + if re.search(r"(д|д\.) ?\d{1,4} ?\/*\d* ?", token["obj"]): return "h" - if ("s" in pre_token['class'] or "h" in pre_token['class'] or "s" in token['class']) \ - and not any_of_in(("-я", "-й", "-Я"), token['obj']) \ - and not find_building(token, pre_token)\ - and not find_edifice(token,pre_token): + if ( + ( + "s" in pre_token["class"] + or "h" in pre_token["class"] + or "s" in token["class"] + ) + and not any_of_in(("-я", "-й", "-Я"), token["obj"]) + and not find_building(token, pre_token) + and not find_edifice(token, pre_token) + ): return "h" - if find_building(token, pre_token) \ - and not any_of_in(("-я", "-й", "-Я"), token['obj']) \ - and True: - if len(re.findall(r"\d{1,4}", token['obj'])) > 1: + if ( + find_building(token, pre_token) + and not any_of_in(("-я", "-й", "-Я"), token["obj"]) + and True + ): + if len(re.findall(r"\d{1,4}", token["obj"])) > 1: return "h" - if int(re.search(r"\d{1,4}", token['obj']).group()) // 10 >0: + if int(re.search(r"\d{1,4}", token["obj"]).group()) // 10 > 0: return "h" return "" def find_street(token: pd.Series, pre_token: pd.Series) -> str: - if any_of_in(STREET_PREFIXES, token['obj'].lower()): + if any_of_in(STREET_PREFIXES, token["obj"].lower()): return "s" - if re.search(r"\b[А-Яа-я]{4,}\b", token['obj']) \ - and not any([el in token["obj"].lower() for pr in PREFIXES for el in pr if len(el)>2]) \ - and not ("d" in token["class"] or "t" in token["class"] or "c" in token["class"]): + if ( + re.search(r"\b[А-Яа-я]{4,}\b", token["obj"]) + and not any( + [el in token["obj"].lower() for pr in PREFIXES for el in pr if len(el) > 2] + ) + and not ( + "d" in token["class"] or "t" in token["class"] or "c" in token["class"] + ) + ): return "s" return "" def find_territory(token: pd.Series, pre_token: pd.Series) -> str: - if any_of_in(TERRITORY_PREFIXES, token['obj'].lower()): + if any_of_in(TERRITORY_PREFIXES, token["obj"].lower()): return "t" return "" def find_countryside(token: pd.Series, pre_token: pd.Series) -> str: - if any_of_in(COUNTRYSIDE_PREFIXES, token['obj'].lower()) \ - and re.search(r"\b[гпдрпктc]{1,3}(\b|\. )", token['obj']) \ - and not find_house(token, pre_token) \ - and not any_of_in(STREET_PREFIXES, token['obj'].lower()): + if ( + any_of_in(COUNTRYSIDE_PREFIXES, token["obj"].lower()) + and re.search(r"\b[гпдрпктc]{1,3}(\b|\. )", token["obj"]) + and not find_house(token, pre_token) + and not any_of_in(STREET_PREFIXES, token["obj"].lower()) + ): return "c" return "" def find_district(token: pd.Series, pre_token: pd.Series) -> str: - if any_of_in(DISTRICTS_PREFIXES, token['obj'].lower()): + if any_of_in(DISTRICTS_PREFIXES, token["obj"].lower()): return "d" return "" + def address_classification(token: pd.Series, pre_token: pd.Series) -> pd.Series: brackets = re.search(r"\(.+\)", token["obj"]) if brackets: @@ -144,17 +239,19 @@ def address_classification(token: pd.Series, pre_token: pd.Series) -> pd.Series: token["class"] += find_edifice(token, pre_token) token["class"] += find_litera(token, pre_token) token["class"] += find_room(token, pre_token) - if token['class'] == "": - token['class'] = "w" + if token["class"] == "": + token["class"] = "w" if brackets: token["obj"] = re.sub(r"\(\)", brackets.group(), token["obj"]) return token + def cut_address(ad: pd.Series, cl: str) -> pd.Series: while ad["class"] and CLASSES.index(ad["class"][-1]) > CLASSES.index(cl[0]): if ad["class"][-1] == "h": - ad["address"] = re.sub(r"[мкдтпучасток]*\.? ?\d{1,4} ?\/*\d* ?", "", - ad["address"].lower()) + ad["address"] = re.sub( + r"[мкдтпучасток]*\.? ?\d{1,4} ?\/*\d* ?", "", ad["address"].lower() + ) elif ad["class"][-1] == "b": num = re.findall(r"к{0,1}\.? ?\d", ad["address"])[-1] ad["address"] = re.sub(num, "", ad["address"]) @@ -239,7 +336,7 @@ class AddressSplitter(Sequence): def substitue_house(self) -> str: house_regex = re.compile(r"\d{1,4} ?[\/\-]?\d* ?") - number = house_regex.findall(self.token['obj'])[0] + number = house_regex.findall(self.token["obj"])[0] if self.has_numbered_street(): house_number_index = 1 @@ -249,7 +346,11 @@ class AddressSplitter(Sequence): number_in_accumulator = house_regex.findall(self.accumulator["address"]) if number_in_accumulator: - return re.sub(number_in_accumulator[house_number_index], number, self.accumulator["address"]) + return re.sub( + number_in_accumulator[house_number_index], + number, + self.accumulator["address"], + ) else: return self.accumulator["address"] @@ -303,19 +404,19 @@ class AddressSplitter(Sequence): self.accumulator["class"] += "l" def has_letter_in(self) -> bool: - return ( - re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", self.accumulator["address"]) - ) + return re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", self.accumulator["address"]) # Room def substitue_room(self, number: int) -> str: - return re.sub(r"пом\. ?\d\-?\d*\w?", number, self.accumulator["address"].strip()) + return re.sub( + r"пом\. ?\d\-?\d*\w?", number, self.accumulator["address"].strip() + ) def insert_room(self): number = re.findall("пом\. ?\-?\d*\w?", self.token["obj"])[-1] self.accumulator["address"] = self.substitue_room(number) - + if number and self.has_no_class("room"): self.accumulator["class"] += "r" @@ -393,12 +494,13 @@ def split_pesoch_res(address: str) -> List[str]: t = re.sub(r",", " ", address) t = re.split(r"(Санкт-Петербург|Ленинградская обл|Л\.О)", t) t = list(map(str.strip, filter(lambda token: token != "", t))) - tokens = [t[i] + " " + t[i+1] for i in range(0, len(t)-1, 2)] + tokens = [t[i] + " " + t[i + 1] for i in range(0, len(t) - 1, 2)] if tokens: return list(set(tokens)) return [address] + def process_row(row: pd.Series[str]) -> pd.Series[str]: row = row.copy() @@ -417,4 +519,4 @@ def process_row(row: pd.Series[str]) -> pd.Series[str]: def split_addresses(df: pd.DataFrame) -> pd.DataFrame: merged_df = df.apply(process_row, axis=1).reset_index() - return merged_df.explode("Улица", ignore_index=True) \ No newline at end of file + return merged_df.explode("Улица", ignore_index=True)