diff --git a/parser/__main__.py b/parser/__main__.py index de668c0..5616d83 100644 --- a/parser/__main__.py +++ b/parser/__main__.py @@ -3,11 +3,11 @@ import time import schedule -from . import pipeline +from . import pipeline, LenenergoParser def job(): - parser = pipeline() + parser = pipeline(LenenergoParser(file_path="./data.csv")) parser.save_df(f'./data_{parser.today.strftime("%d-%m-%y_%H.%M")}.csv') diff --git a/parser/address.py b/parser/address.py index 2d94cae..28769f7 100644 --- a/parser/address.py +++ b/parser/address.py @@ -2,12 +2,13 @@ from __future__ import annotations import re from typing import Iterable, List, TypeVar +from collections.abc import Sequence import pandas as pd T = TypeVar("T") -CLASSES = ("w", "d", "c", "t", "s", "h", "b","e", "l", "r") +CLASSES = ("w", "d", "c", "t", "s", "h", "b", "e", "l", "r") DISTRICTS_PREFIXES = ("мо ", "р-н","городское","лесхоз") COUNTRYSIDE_PREFIXES = ( "г", "п", "д", "гп", "рп", "кп", "пгт", "c", "хутор", " урочище") @@ -61,7 +62,7 @@ def find_litera(token: pd.Series, pre_token: pd.Series) -> str: return "l" if (re.search(r"\b([А-Я]|[а-я]){1}$", token['obj']) \ and ("l" in pre_token['class'] or "h" in pre_token['class'])) \ - and not (" ш" in token["obj"]) \ + and (" ш" not in token["obj"]) \ and not find_countryside(token, pre_token): return "l" return "" @@ -73,7 +74,7 @@ def find_edifice(token: pd.Series, pre_token: pd.Series) -> str: def find_building(token: pd.Series, pre_token: pd.Series) -> str: if re.search(r"\d", token['obj']) and not find_room(token,pre_token): if any_of_in(BUILDING_PREFIXES, token['obj'].lower()) \ - or "b" in pre_token['class'] and not ("h" in token['class']) and not find_edifice(token,pre_token)\ + or "b" in pre_token['class'] and ("h" not in token['class']) and not find_edifice(token,pre_token)\ or re.search(r"к\.* ?\d", token['obj']): return "b" return "" @@ -155,7 +156,7 @@ def cut_address(ad: pd.Series, cl: str) -> pd.Series: ad["address"] = re.sub(r"[мкдтпучасток]*\.? ?\d{1,4} ?\/*\d* ?", "", ad["address"].lower()) elif ad["class"][-1] == "b": - num = re.findall("к{0,1}\.? ?\d", ad["address"])[-1] + num = re.findall(r"к{0,1}\.? ?\d", ad["address"])[-1] ad["address"] = re.sub(num, "", ad["address"]) elif ad["class"][-1] == "e": ad["address"] = re.sub(r"cтр\.? ?\d", "", ad["address"]) @@ -167,102 +168,226 @@ def cut_address(ad: pd.Series, cl: str) -> pd.Series: return ad -# TODO: переработать систему из if в нормальный вид -def split_address(address: str) -> List[str]: - if ";" in address: - address = address.replace(";", ",") - if "," in address: - tokens = address.split(",") +def is_valid_token(string: str) -> bool: + return string not in ("", "уг.", "д.") - t = list(map(str.strip, filter(lambda token: token != "", tokens))) - tokens = pd.DataFrame() - tokens['obj'] = t - for el in ("", "уг.", "д."): - tokens = tokens[tokens["obj"] != el] - tokens.insert(len(tokens.columns), "class", "") - res = [] - accumulator = pd.Series(data={"address": "", "class": ""}) +def create_token(obj: str = "", token_class: str = ""): + return pd.Series( + { + "obj": obj, + "class": token_class, + } + ) - for i in range(len(tokens)): - cur_tk = tokens.iloc[i] - if i == 0: - pre_token = pd.Series(data=["", ""], index=['obj', 'class']) - else: - pre_token = tokens.iloc[i - 1] +class AddressSplitter(Sequence): + def __init__(self, address: str): + self.input = address - cur_tk = address_classification(cur_tk, pre_token) - tokens.iloc[i] = cur_tk - print(tokens.iloc[i]) + self.addresses = self.split() - if not accumulator["class"]: - accumulator["class"] = cur_tk['class'] - accumulator["address"] = cur_tk["obj"] + ## Sequence abstract methods implementation + + def __getitem__(self, key: int): + if key < len(self.addresses): + return self.addresses[key] + else: + raise IndexError() + + def __len__(self): + return len(self.addresses) + + ## Address token class manipulations + + def next_class(self) -> str: + return self.token["class"][0] + + def correct_order(self) -> bool: + prev_class = self.accumulator["class"][-1] + + return ( + CLASSES.index(prev_class) < CLASSES.index(self.next_class()) + and self.accumulator["class"] != "w" + ) + + def next_class_is(self, comparing_class: str) -> bool: + return len(self.token["class"]) > 0 and self.next_class() == comparing_class[0] + + def has_no_class(self, comparing_class: str) -> bool: + return comparing_class[0] not in self.accumulator["class"] + + def pop_token_class(self): + self.token["class"] = self.token["class"][1:] + + ## Accumulator constrains + + def next_is_street_or_upper(self) -> bool: + is_unknown_class = self.accumulator["class"] in ("", "w") + + return ( + CLASSES.index(self.next_class()) <= CLASSES.index("s") or is_unknown_class + ) + + def has_numbered_street(self) -> bool: + return any_of_in(("-я", "-й", "-Я"), self.accumulator["address"]) + + ## Accumulator manipulation + + # House + + def substitue_house(self) -> str: + house_regex = re.compile(r"\d{1,4} ?[\/\-]?\d* ?") + + number = house_regex.findall(self.token['obj'])[0] + + if self.has_numbered_street(): + house_number_index = 1 + else: + house_number_index = 0 + + number_in_accumulator = house_regex.findall(self.accumulator["address"]) + + if number_in_accumulator: + return re.sub(number_in_accumulator[house_number_index], number, self.accumulator["address"]) + else: + return self.accumulator["address"] + + # Building + + def append_building(self, number: int) -> pd.Series: + self.accumulator["class"] += "b" + self.accumulator["address"] += "к." + number + + return self.accumulator + + def substitue_building(self, number: int) -> str: + return re.sub(r"\d$", number, self.accumulator["address"]) + + def insert_building(self): + number = re.findall(r"\d", self.token["obj"])[-1] + + if number and self.has_no_class("building"): + self.accumulator = self.append_building(number) + else: + self.accumulator["address"] = self.substitue_building(number) + + # Edifice + + def substitue_edifice(self, number: int) -> str: + return re.sub(r"cтр\. ?\d", number, self.accumulator["address"].strip()) + + def insert_edifice(self): + number = re.findall("стр\.? ?\d", self.token["obj"])[-1] + + self.accumulator["address"] = self.substitue_edifice(number) + + if number and self.has_no_class("edifice"): + self.accumulator["class"] += "e" + + # Letter + + def without_letter(self) -> str: + return re.sub(r"[А-Яа-я]$", "", self.accumulator["address"].strip()) + + def substitue_letter(self, letter: str) -> str: + address_without_letter = self.without_letter() + + return address_without_letter + letter + + def insert_letter(self): + letter = re.findall(r"[А-Яа-я]", self.token["obj"])[-1] + self.accumulator["address"] = self.substitue_letter(letter) + + if letter and self.has_no_class("litera"): + self.accumulator["class"] += "l" + + def has_letter_in(self) -> bool: + return ( + re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", self.accumulator["address"]) + ) + + # Room + + def substitue_room(self, number: int) -> str: + return re.sub(r"пом\. ?\d\-?\d*\w?", number, self.accumulator["address"].strip()) + + def insert_room(self): + number = re.findall("пом\. ?\-?\d*\w?", self.token["obj"])[-1] + self.accumulator["address"] = self.substitue_room(number) + + if number and self.has_no_class("room"): + self.accumulator["class"] += "r" + + ## Data preprocessing + + def split_tokens(self) -> list[pd.Series]: + address = self.input.replace(";", ",") + + parts = address.split(",") + parts = map(str.strip, parts) + parts = filter(is_valid_token, parts) + + tokens = map(lambda part: create_token(part, ""), parts) + + return list(tokens) + + def split(self): + self.tokens = self.split_tokens() + + result = [] + + self.accumulator = pd.Series({"address": "", "class": ""}) + + prev_token = create_token() + + for cursor in self.tokens: + self.token = address_classification(cursor, prev_token) + prev_token = self.token.copy() + + if self.accumulator["class"] == "": + self.accumulator = self.token.rename({"obj": "address"}) continue - if CLASSES.index(accumulator["class"][-1]) < CLASSES.index(cur_tk["class"][0]) and accumulator["class"]!="w": - accumulator["class"] += cur_tk['class'] - accumulator["address"] += " " + cur_tk["obj"] + if self.correct_order(): + self.accumulator["address"] += " " + self.accumulator += self.token.rename({"obj": "address"}) else: - ad_no_ranges = unfold_house_ranges(accumulator["address"]) - accumulator["address"] = ad_no_ranges[-1] + unfolded_address = unfold_house_ranges(self.accumulator["address"]) + self.accumulator["address"] = unfolded_address[-1] - res.extend(ad_no_ranges) + result.extend(unfolded_address) - accumulator = cut_address(accumulator, cur_tk["class"]) + self.accumulator = cut_address(self.accumulator, self.token["class"]) - if not accumulator["class"] or CLASSES.index(cur_tk["class"][0]) <= CLASSES.index("s") or accumulator["class"]=="w": - accumulator["class"] = cur_tk["class"] - accumulator["address"] = cur_tk["obj"] + if self.next_is_street_or_upper(): + self.accumulator = self.token.rename({"obj": "address"}) - if cur_tk["class"][0] == "h": - num = re.findall("\d{1,4} ?[\/\-]?\d* ?", cur_tk['obj'])[0] - if any_of_in(("-я", "-й", "-Я"), accumulator["address"]): - idx = 1 - else: - idx = 0 - num_ac = re.findall("\d{1,4} ?[\/\-]?\d* ?", accumulator["address"]) - if num_ac: - accumulator["address"] = re.sub(num_ac[idx], num, accumulator["address"]) - cur_tk["class"] =cur_tk["class"][1:] + if self.next_class_is("house"): + self.accumulator["address"] = self.substitue_house() + self.pop_token_class() - if cur_tk["class"] and cur_tk["class"][0] == "b": - num = re.findall("\d", cur_tk["obj"])[-1] - if num and not "b" in accumulator["class"]: - accumulator["class"] += "b" - accumulator["address"] += "к." + num - else: - accumulator["address"] = re.sub(r"\d$", num, accumulator["address"]) - cur_tk["class"] = cur_tk["class"][1:] + if self.next_class_is("building"): + self.insert_building() + self.pop_token_class() - if cur_tk["class"] and cur_tk["class"][0] == "e": - num = re.findall("стр\.? ?\d", cur_tk["obj"].strip())[-1] - accumulator["address"] = re.sub(r"cтр\. ?\d", num, accumulator["address"].strip()) - if num and not "e" in accumulator["class"]: - accumulator["class"] += "e" - cur_tk["class"] = cur_tk["class"][1:] + if self.next_class_is("edifice"): + self.insert_edifice() + self.pop_token_class() - if cur_tk["class"] and cur_tk["class"][0] == "l": - num = re.findall("[А-Яа-я]", cur_tk["obj"].strip())[-1] - accumulator["address"] = re.sub(r"[А-Яа-я]$", "", accumulator["address"].strip()) - accumulator["address"] += num - if num and not "l" in accumulator["class"]: - accumulator["class"] += "l" - else: - if re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", accumulator["address"]): - accumulator["address"] = re.sub(r"[А-Яа-я]$", "", accumulator["address"].strip()) - if cur_tk["class"] and cur_tk["class"][0] == "r": - num = re.findall("пом\. ?\-?\d*\w?", cur_tk["obj"].strip())[-1] - accumulator["address"] = re.sub(r"пом\. ?\d\-?\d*\w?", num, accumulator["address"].strip()) - if num and not "r" in accumulator["class"]: - accumulator["class"] += "r" - cur_tk["class"] = cur_tk["class"][1:] - res.extend(unfold_house_ranges(accumulator["address"])) - print(res) - return res + if self.next_class_is("letter"): + self.insert_letter() + elif self.has_letter_in(): + self.accumulator["address"] = self.without_letter() + + if self.next_class_is("room"): + self.insert_room() + self.pop_token_class() + + result.extend(unfold_house_ranges(self.accumulator["address"])) + + return result - return [address] def split_pesoch_res(address: str) -> List[str]: t = re.sub(r",", " ", address) @@ -283,7 +408,7 @@ def process_row(row: pd.Series[str]) -> pd.Series[str]: if row["РЭС"] == "Песочинский РЭС": addresses = split_pesoch_res(row["Улица"]) else: - addresses = split_address(row["Улица"]) + addresses = AddressSplitter(row["Улица"]) row["Улица"] = addresses return row diff --git a/parser/util.py b/parser/util.py index 983c9b8..9fad768 100644 --- a/parser/util.py +++ b/parser/util.py @@ -10,7 +10,7 @@ from . import ( def pipeline(parser: Optional[LenenergoParser] = None) -> LenenergoParser: if parser is None: - parser = LenenergoParser(file_path = r"C:\Users\Юля\PycharmProjects\machine_learning\lenengro_parser\data_Rosseti.csv") + parser = LenenergoParser(parser) print(parser)