diff --git a/parser/__main__.py b/parser/__main__.py index de668c0..5616d83 100644 --- a/parser/__main__.py +++ b/parser/__main__.py @@ -3,11 +3,11 @@ import time import schedule -from . import pipeline +from . import pipeline, LenenergoParser def job(): - parser = pipeline() + parser = pipeline(LenenergoParser(file_path="./data.csv")) parser.save_df(f'./data_{parser.today.strftime("%d-%m-%y_%H.%M")}.csv') diff --git a/parser/address.py b/parser/address.py index 73f53e4..4239bf7 100644 --- a/parser/address.py +++ b/parser/address.py @@ -2,6 +2,7 @@ from __future__ import annotations import re from typing import Iterable, List, TypeVar +from collections.abc import Sequence import pandas as pd @@ -146,82 +147,177 @@ def cut_address(ad: pd.Series, cl: str) -> pd.Series: return ad -# TODO: переработать систему из if в нормальный вид -def split_address(address: str) -> List[str]: - if ";" in address: - address = address.replace(";", ",") - if "," in address: - tokens = address.split(",") +def is_nonempty_str(string: str) -> bool: + return string != "" - t = list(map(str.strip, filter(lambda token: token != "", tokens))) - tokens = pd.DataFrame() - tokens['obj'] = t - tokens = tokens[tokens["obj"] != ""] - tokens.insert(len(tokens.columns), "class", "") - res = [] - accumulator = pd.Series(data={"address": "", "class": ""}) +def create_token(obj: str = "", token_class: str = ""): + return pd.Series( + { + "obj": obj, + "class": token_class, + } + ) - for i in range(len(tokens)): - cur_tk = tokens.iloc[i] - if i == 0: - pre_token = pd.Series(data=["", ""], index=['obj', 'class']) - else: - pre_token = tokens.iloc[i - 1] +class AddressSplitter(Sequence): + addresses: list[str] + tokens: list[pd.Series] - cur_tk = address_classification(cur_tk, pre_token) - tokens.iloc[i] = cur_tk - print(tokens.iloc[i]) + def __init__(self, address: str): + self.input = address - if not accumulator["class"]: - accumulator["class"] = cur_tk['class'] - accumulator["address"] = cur_tk["obj"] + self.addresses = self.split() + + if len(self.addresses) == 0: + self.addresses = [address] + + # Sequence abstract methods implementation + + def __getitem__(self, key: int): + if key < len(self.addresses): + return self.addresses[key] + else: + raise IndexError() + + def __len__(self): + return len(self.addresses) + + # Address token class manipulations + + def next_class(self) -> str: + return self.token["class"][0] + + def correct_order(self) -> bool: + prev_class = self.accumulator["class"][-1] + + return ( + CLASSES.index(prev_class) < CLASSES.index(self.next_class()) + and self.accumulator["class"] != "w" + ) + + def next_class_is(self, comparing_class: str) -> bool: + return len(self.token["class"]) > 0 and self.next_class() == comparing_class[0] + + def pop_token_class(self): + self.token["class"] = self.token["class"][1:] + + def has_no_class(self, comparing_class: str) -> bool: + return comparing_class[0] not in self.accumulator["class"] + + def next_is_street_or_upper(self) -> bool: + is_unknown_class = self.accumulator["class"] in ("", "w") + + return ( + CLASSES.index(self.next_class()) <= CLASSES.index("s") or is_unknown_class + ) + + # Accumulator manipulation + + def substitue_house(self) -> str: + num = re.findall(r"\d{1,4} ?\/?\d* ?", self.token["obj"])[0] + + return re.sub(r"\d{1,4} ?\/*\d* ?", num, self.accumulator["address"]) + + def append_building(self, num: int) -> pd.Series: + self.accumulator["class"] += "b" + self.accumulator["address"] += "к." + num + + return self.accumulator + + def substitue_building(self, num: int) -> str: + return re.sub(r"\d$", num, self.accumulator["address"]) + + def insert_building(self): + number = re.findall(r"\d", self.token["obj"])[-1] + + if number and self.has_no_class("building"): + self.accumulator = self.append_building(number) + else: + self.accumulator["address"] = self.substitue_building(number) + + def without_letter(self) -> str: + return re.sub(r"[А-Яа-я]$", "", self.accumulator["address"].strip()) + + def substitue_letter(self, letter: str) -> str: + address_without_letter = self.without_letter() + + return address_without_letter + letter + + def insert_letter(self): + letter = re.findall(r"[А-Яа-я]", self.token["obj"].strip())[-1] + self.accumulator["address"] = self.substitue_letter(letter) + + if letter and self.has_no_class("litera"): + self.accumulator["class"] += "l" + + def has_letter_in(self) -> bool: + return ( + re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", self.accumulator["address"]) + is not None + ) + + # Data preprocessing + + def split_tokens(self) -> list[pd.Series]: + address = self.input.replace(";", ",") + + parts = address.split(",") + parts = map(str.strip, parts) + parts = filter(is_nonempty_str, parts) + + tokens = map(lambda part: create_token(part, ""), parts) + + return list(tokens) + + def split(self): + self.tokens = self.split_tokens() + + result = [] + + self.accumulator = pd.Series({"address": "", "class": ""}) + + prev_token = create_token() + + for cursor in self.tokens: + self.token = address_classification(cursor, prev_token) + prev_token = self.token.copy() + + if self.accumulator["class"] == "": + self.accumulator = self.token.rename({"obj": "address"}) continue - if CLASSES.index(accumulator["class"][-1]) < CLASSES.index(cur_tk["class"][0]) and accumulator["class"]!="w": - accumulator["class"] += cur_tk['class'] - accumulator["address"] += " " + cur_tk["obj"] + if self.correct_order(): + self.accumulator["address"] += " " + self.accumulator += self.token.rename({"obj": "address"}) else: - ad_no_ranges = unfold_house_ranges(accumulator["address"]) - accumulator["address"] = ad_no_ranges[-1] + unfolded_address = unfold_house_ranges(self.accumulator["address"]) + self.accumulator["address"] = unfolded_address[-1] - res.extend(ad_no_ranges) + result.extend(unfolded_address) - accumulator = cut_address(accumulator, cur_tk["class"]) + self.accumulator = cut_address(self.accumulator, self.token["class"]) - if not accumulator["class"] or CLASSES.index(cur_tk["class"][0]) <= CLASSES.index("s") or accumulator["class"]=="w": - accumulator["class"] = cur_tk["class"] - accumulator["address"] = cur_tk["obj"] + if self.next_is_street_or_upper(): + self.accumulator = self.token.rename({"obj": "address"}) - if cur_tk["class"][0] == "h": - num = re.findall("\d{1,4} ?\/?\d* ?", cur_tk['obj'])[0] - accumulator["address"] = re.sub(r"\d{1,4} ?\/*\d* ?", num, accumulator["address"]) - cur_tk["class"] =cur_tk["class"][1:] + if self.next_class_is("house"): + self.accumulator["address"] = self.substitue_house() + self.pop_token_class() - if cur_tk["class"] and cur_tk["class"][0] == "b": - num = re.findall("\d", cur_tk["obj"])[-1] - if num and not "b" in accumulator["class"]: - accumulator["class"] += "b" - accumulator["address"] += "к." + num - else: - accumulator["address"] = re.sub(r"\d$", num, accumulator["address"]) - cur_tk["class"] = cur_tk["class"][1:] + if self.next_class_is("building"): + self.insert_building() + self.pop_token_class() - if cur_tk["class"] and cur_tk["class"][0] == "l": - num = re.findall("[А-Яа-я]", cur_tk["obj"].strip())[-1] - accumulator["address"] = re.sub(r"[А-Яа-я]$", "", accumulator["address"].strip()) - accumulator["address"] += num - if num and not "l" in accumulator["class"]: - accumulator["class"] += "l" - else: - if re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", accumulator["address"]): - accumulator["address"] = re.sub(r"[А-Яа-я]$", "", accumulator["address"].strip()) - res.extend(unfold_house_ranges(accumulator["address"])) - print(res) - return res + if self.next_class_is("letter"): + self.insert_letter() + elif self.has_letter_in(): + self.accumulator["address"] = self.without_letter() + + result.extend(unfold_house_ranges(self.accumulator["address"])) + + return result - return [address] def split_pesoch_res(address: str) -> List[str]: t = re.sub(r",", " ", address) @@ -242,7 +338,7 @@ def process_row(row: pd.Series[str]) -> pd.Series[str]: if row["РЭС"] == "Песочинский РЭС": addresses = split_pesoch_res(row["Улица"]) else: - addresses = split_address(row["Улица"]) + addresses = AddressSplitter(row["Улица"]) row["Улица"] = addresses return row diff --git a/parser/util.py b/parser/util.py index 983c9b8..9fad768 100644 --- a/parser/util.py +++ b/parser/util.py @@ -10,7 +10,7 @@ from . import ( def pipeline(parser: Optional[LenenergoParser] = None) -> LenenergoParser: if parser is None: - parser = LenenergoParser(file_path = r"C:\Users\Юля\PycharmProjects\machine_learning\lenengro_parser\data_Rosseti.csv") + parser = LenenergoParser(parser) print(parser)