Merge branch 'splitter_refactor' into dev

This commit is contained in:
Dmitriy Shishkov 2023-10-29 12:19:47 +03:00
commit 662c2541db
Signed by: dm1sh
GPG Key ID: 027994B0AA357688
3 changed files with 212 additions and 87 deletions

View File

@ -3,11 +3,11 @@ import time
import schedule import schedule
from . import pipeline from . import pipeline, LenenergoParser
def job(): def job():
parser = pipeline() parser = pipeline(LenenergoParser(file_path="./data.csv"))
parser.save_df(f'./data_{parser.today.strftime("%d-%m-%y_%H.%M")}.csv') parser.save_df(f'./data_{parser.today.strftime("%d-%m-%y_%H.%M")}.csv')

View File

@ -2,12 +2,13 @@ from __future__ import annotations
import re import re
from typing import Iterable, List, TypeVar from typing import Iterable, List, TypeVar
from collections.abc import Sequence
import pandas as pd import pandas as pd
T = TypeVar("T") T = TypeVar("T")
CLASSES = ("w", "d", "c", "t", "s", "h", "b","e", "l", "r") CLASSES = ("w", "d", "c", "t", "s", "h", "b", "e", "l", "r")
DISTRICTS_PREFIXES = ("мо ", "р","городское","лесхоз") DISTRICTS_PREFIXES = ("мо ", "р","городское","лесхоз")
COUNTRYSIDE_PREFIXES = ( COUNTRYSIDE_PREFIXES = (
"г", "п", "д", "гп", "рп", "кп", "пгт", "c", "хутор", " урочище") "г", "п", "д", "гп", "рп", "кп", "пгт", "c", "хутор", " урочище")
@ -61,7 +62,7 @@ def find_litera(token: pd.Series, pre_token: pd.Series) -> str:
return "l" return "l"
if (re.search(r"\b([А-Я]|[а-я]){1}$", token['obj']) \ if (re.search(r"\b([А-Я]|[а-я]){1}$", token['obj']) \
and ("l" in pre_token['class'] or "h" in pre_token['class'])) \ and ("l" in pre_token['class'] or "h" in pre_token['class'])) \
and not (" ш" in token["obj"]) \ and (" ш" not in token["obj"]) \
and not find_countryside(token, pre_token): and not find_countryside(token, pre_token):
return "l" return "l"
return "" return ""
@ -73,7 +74,7 @@ def find_edifice(token: pd.Series, pre_token: pd.Series) -> str:
def find_building(token: pd.Series, pre_token: pd.Series) -> str: def find_building(token: pd.Series, pre_token: pd.Series) -> str:
if re.search(r"\d", token['obj']) and not find_room(token,pre_token): if re.search(r"\d", token['obj']) and not find_room(token,pre_token):
if any_of_in(BUILDING_PREFIXES, token['obj'].lower()) \ if any_of_in(BUILDING_PREFIXES, token['obj'].lower()) \
or "b" in pre_token['class'] and not ("h" in token['class']) and not find_edifice(token,pre_token)\ or "b" in pre_token['class'] and ("h" not in token['class']) and not find_edifice(token,pre_token)\
or re.search(r"к\.* ?\d", token['obj']): or re.search(r"к\.* ?\d", token['obj']):
return "b" return "b"
return "" return ""
@ -155,7 +156,7 @@ def cut_address(ad: pd.Series, cl: str) -> pd.Series:
ad["address"] = re.sub(r"[мкдтпучасток]*\.? ?\d{1,4} ?\/*\d* ?", "", ad["address"] = re.sub(r"[мкдтпучасток]*\.? ?\d{1,4} ?\/*\d* ?", "",
ad["address"].lower()) ad["address"].lower())
elif ad["class"][-1] == "b": elif ad["class"][-1] == "b":
num = re.findall("к{0,1}\.? ?\d", ad["address"])[-1] num = re.findall(r"к{0,1}\.? ?\d", ad["address"])[-1]
ad["address"] = re.sub(num, "", ad["address"]) ad["address"] = re.sub(num, "", ad["address"])
elif ad["class"][-1] == "e": elif ad["class"][-1] == "e":
ad["address"] = re.sub(r"р\.? ?\d", "", ad["address"]) ad["address"] = re.sub(r"р\.? ?\d", "", ad["address"])
@ -167,102 +168,226 @@ def cut_address(ad: pd.Series, cl: str) -> pd.Series:
return ad return ad
# TODO: переработать систему из if в нормальный вид def is_valid_token(string: str) -> bool:
def split_address(address: str) -> List[str]: return string not in ("", "уг.", "д.")
if ";" in address:
address = address.replace(";", ",")
if "," in address:
tokens = address.split(",")
t = list(map(str.strip, filter(lambda token: token != "", tokens)))
tokens = pd.DataFrame() def create_token(obj: str = "", token_class: str = ""):
tokens['obj'] = t return pd.Series(
for el in ("", "уг.", "д."): {
tokens = tokens[tokens["obj"] != el] "obj": obj,
tokens.insert(len(tokens.columns), "class", "") "class": token_class,
res = [] }
accumulator = pd.Series(data={"address": "", "class": ""}) )
for i in range(len(tokens)):
cur_tk = tokens.iloc[i]
if i == 0: class AddressSplitter(Sequence):
pre_token = pd.Series(data=["", ""], index=['obj', 'class']) def __init__(self, address: str):
else: self.input = address
pre_token = tokens.iloc[i - 1]
cur_tk = address_classification(cur_tk, pre_token) self.addresses = self.split()
tokens.iloc[i] = cur_tk
print(tokens.iloc[i])
if not accumulator["class"]: ## Sequence abstract methods implementation
accumulator["class"] = cur_tk['class']
accumulator["address"] = cur_tk["obj"] def __getitem__(self, key: int):
if key < len(self.addresses):
return self.addresses[key]
else:
raise IndexError()
def __len__(self):
return len(self.addresses)
## Address token class manipulations
def next_class(self) -> str:
return self.token["class"][0]
def correct_order(self) -> bool:
prev_class = self.accumulator["class"][-1]
return (
CLASSES.index(prev_class) < CLASSES.index(self.next_class())
and self.accumulator["class"] != "w"
)
def next_class_is(self, comparing_class: str) -> bool:
return len(self.token["class"]) > 0 and self.next_class() == comparing_class[0]
def has_no_class(self, comparing_class: str) -> bool:
return comparing_class[0] not in self.accumulator["class"]
def pop_token_class(self):
self.token["class"] = self.token["class"][1:]
## Accumulator constrains
def next_is_street_or_upper(self) -> bool:
is_unknown_class = self.accumulator["class"] in ("", "w")
return (
CLASSES.index(self.next_class()) <= CLASSES.index("s") or is_unknown_class
)
def has_numbered_street(self) -> bool:
return any_of_in(("", "", ""), self.accumulator["address"])
## Accumulator manipulation
# House
def substitue_house(self) -> str:
house_regex = re.compile(r"\d{1,4} ?[\/\-]?\d* ?")
number = house_regex.findall(self.token['obj'])[0]
if self.has_numbered_street():
house_number_index = 1
else:
house_number_index = 0
number_in_accumulator = house_regex.findall(self.accumulator["address"])
if number_in_accumulator:
return re.sub(number_in_accumulator[house_number_index], number, self.accumulator["address"])
else:
return self.accumulator["address"]
# Building
def append_building(self, number: int) -> pd.Series:
self.accumulator["class"] += "b"
self.accumulator["address"] += "к." + number
return self.accumulator
def substitue_building(self, number: int) -> str:
return re.sub(r"\d$", number, self.accumulator["address"])
def insert_building(self):
number = re.findall(r"\d", self.token["obj"])[-1]
if number and self.has_no_class("building"):
self.accumulator = self.append_building(number)
else:
self.accumulator["address"] = self.substitue_building(number)
# Edifice
def substitue_edifice(self, number: int) -> str:
return re.sub(r"р\. ?\d", number, self.accumulator["address"].strip())
def insert_edifice(self):
number = re.findall("стр\.? ?\d", self.token["obj"])[-1]
self.accumulator["address"] = self.substitue_edifice(number)
if number and self.has_no_class("edifice"):
self.accumulator["class"] += "e"
# Letter
def without_letter(self) -> str:
return re.sub(r"[А-Яа-я]$", "", self.accumulator["address"].strip())
def substitue_letter(self, letter: str) -> str:
address_without_letter = self.without_letter()
return address_without_letter + letter
def insert_letter(self):
letter = re.findall(r"[А-Яа-я]", self.token["obj"])[-1]
self.accumulator["address"] = self.substitue_letter(letter)
if letter and self.has_no_class("litera"):
self.accumulator["class"] += "l"
def has_letter_in(self) -> bool:
return (
re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", self.accumulator["address"])
)
# Room
def substitue_room(self, number: int) -> str:
return re.sub(r"пом\. ?\d\-?\d*\w?", number, self.accumulator["address"].strip())
def insert_room(self):
number = re.findall("пом\. ?\-?\d*\w?", self.token["obj"])[-1]
self.accumulator["address"] = self.substitue_room(number)
if number and self.has_no_class("room"):
self.accumulator["class"] += "r"
## Data preprocessing
def split_tokens(self) -> list[pd.Series]:
address = self.input.replace(";", ",")
parts = address.split(",")
parts = map(str.strip, parts)
parts = filter(is_valid_token, parts)
tokens = map(lambda part: create_token(part, ""), parts)
return list(tokens)
def split(self):
self.tokens = self.split_tokens()
result = []
self.accumulator = pd.Series({"address": "", "class": ""})
prev_token = create_token()
for cursor in self.tokens:
self.token = address_classification(cursor, prev_token)
prev_token = self.token.copy()
if self.accumulator["class"] == "":
self.accumulator = self.token.rename({"obj": "address"})
continue continue
if CLASSES.index(accumulator["class"][-1]) < CLASSES.index(cur_tk["class"][0]) and accumulator["class"]!="w": if self.correct_order():
accumulator["class"] += cur_tk['class'] self.accumulator["address"] += " "
accumulator["address"] += " " + cur_tk["obj"] self.accumulator += self.token.rename({"obj": "address"})
else: else:
ad_no_ranges = unfold_house_ranges(accumulator["address"]) unfolded_address = unfold_house_ranges(self.accumulator["address"])
accumulator["address"] = ad_no_ranges[-1] self.accumulator["address"] = unfolded_address[-1]
res.extend(ad_no_ranges) result.extend(unfolded_address)
accumulator = cut_address(accumulator, cur_tk["class"]) self.accumulator = cut_address(self.accumulator, self.token["class"])
if not accumulator["class"] or CLASSES.index(cur_tk["class"][0]) <= CLASSES.index("s") or accumulator["class"]=="w": if self.next_is_street_or_upper():
accumulator["class"] = cur_tk["class"] self.accumulator = self.token.rename({"obj": "address"})
accumulator["address"] = cur_tk["obj"]
if cur_tk["class"][0] == "h": if self.next_class_is("house"):
num = re.findall("\d{1,4} ?[\/\-]?\d* ?", cur_tk['obj'])[0] self.accumulator["address"] = self.substitue_house()
if any_of_in(("", "", ""), accumulator["address"]): self.pop_token_class()
idx = 1
else:
idx = 0
num_ac = re.findall("\d{1,4} ?[\/\-]?\d* ?", accumulator["address"])
if num_ac:
accumulator["address"] = re.sub(num_ac[idx], num, accumulator["address"])
cur_tk["class"] =cur_tk["class"][1:]
if cur_tk["class"] and cur_tk["class"][0] == "b": if self.next_class_is("building"):
num = re.findall("\d", cur_tk["obj"])[-1] self.insert_building()
if num and not "b" in accumulator["class"]: self.pop_token_class()
accumulator["class"] += "b"
accumulator["address"] += "к." + num
else:
accumulator["address"] = re.sub(r"\d$", num, accumulator["address"])
cur_tk["class"] = cur_tk["class"][1:]
if cur_tk["class"] and cur_tk["class"][0] == "e": if self.next_class_is("edifice"):
num = re.findall("стр\.? ?\d", cur_tk["obj"].strip())[-1] self.insert_edifice()
accumulator["address"] = re.sub(r"р\. ?\d", num, accumulator["address"].strip()) self.pop_token_class()
if num and not "e" in accumulator["class"]:
accumulator["class"] += "e"
cur_tk["class"] = cur_tk["class"][1:]
if cur_tk["class"] and cur_tk["class"][0] == "l": if self.next_class_is("letter"):
num = re.findall("[А-Яа-я]", cur_tk["obj"].strip())[-1] self.insert_letter()
accumulator["address"] = re.sub(r"[А-Яа-я]$", "", accumulator["address"].strip()) elif self.has_letter_in():
accumulator["address"] += num self.accumulator["address"] = self.without_letter()
if num and not "l" in accumulator["class"]:
accumulator["class"] += "l" if self.next_class_is("room"):
else: self.insert_room()
if re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", accumulator["address"]): self.pop_token_class()
accumulator["address"] = re.sub(r"[А-Яа-я]$", "", accumulator["address"].strip())
if cur_tk["class"] and cur_tk["class"][0] == "r": result.extend(unfold_house_ranges(self.accumulator["address"]))
num = re.findall("пом\. ?\-?\d*\w?", cur_tk["obj"].strip())[-1]
accumulator["address"] = re.sub(r"пом\. ?\d\-?\d*\w?", num, accumulator["address"].strip()) return result
if num and not "r" in accumulator["class"]:
accumulator["class"] += "r"
cur_tk["class"] = cur_tk["class"][1:]
res.extend(unfold_house_ranges(accumulator["address"]))
print(res)
return res
return [address]
def split_pesoch_res(address: str) -> List[str]: def split_pesoch_res(address: str) -> List[str]:
t = re.sub(r",", " ", address) t = re.sub(r",", " ", address)
@ -283,7 +408,7 @@ def process_row(row: pd.Series[str]) -> pd.Series[str]:
if row["РЭС"] == "Песочинский РЭС": if row["РЭС"] == "Песочинский РЭС":
addresses = split_pesoch_res(row["Улица"]) addresses = split_pesoch_res(row["Улица"])
else: else:
addresses = split_address(row["Улица"]) addresses = AddressSplitter(row["Улица"])
row["Улица"] = addresses row["Улица"] = addresses
return row return row

View File

@ -10,7 +10,7 @@ from . import (
def pipeline(parser: Optional[LenenergoParser] = None) -> LenenergoParser: def pipeline(parser: Optional[LenenergoParser] = None) -> LenenergoParser:
if parser is None: if parser is None:
parser = LenenergoParser(file_path = r"C:\Users\Юля\PycharmProjects\machine_learning\lenengro_parser\data_Rosseti.csv") parser = LenenergoParser(parser)
print(parser) print(parser)