Compare commits

...

6 Commits

Author SHA1 Message Date
931ff1270b
Fixed import errors in parser 2023-10-29 14:25:00 +03:00
cc2029802b
Split address.py to module 2023-10-29 14:24:39 +03:00
06f08d4933
Renamed pipeline file 2023-10-29 12:27:14 +03:00
e6af86703e
Applied formatter 2023-10-29 12:24:49 +03:00
662c2541db
Merge branch 'splitter_refactor' into dev 2023-10-29 12:19:47 +03:00
5722fc86fb
Rewrote split_address as a class AddressSplitter 2023-10-29 10:44:08 +03:00
8 changed files with 572 additions and 305 deletions

View File

@ -7,6 +7,7 @@ from .building_id import (
get_building_id, get_building_id,
) )
from .lenenergo import LenenergoParser from .lenenergo import LenenergoParser
from .pipeline import pipeline
from .preprocess import ( from .preprocess import (
COL_NS, COL_NS,
ICOL_NS, ICOL_NS,
@ -14,7 +15,6 @@ from .preprocess import (
preprocess_df, preprocess_df,
preprocess_read_df, preprocess_read_df,
) )
from .util import pipeline
__all__ = ( __all__ = (
"async_fetch_building_id", "async_fetch_building_id",

View File

@ -3,11 +3,11 @@ import time
import schedule import schedule
from . import pipeline from . import LenenergoParser, pipeline
def job(): def job():
parser = pipeline() parser = pipeline(LenenergoParser(file_path="./data.csv"))
parser.save_df(f'./data_{parser.today.strftime("%d-%m-%y_%H.%M")}.csv') parser.save_df(f'./data_{parser.today.strftime("%d-%m-%y_%H.%M")}.csv')

View File

@ -1,295 +0,0 @@
from __future__ import annotations
import re
from typing import Iterable, List, TypeVar
import pandas as pd
T = TypeVar("T")
CLASSES = ("w", "d", "c", "t", "s", "h", "b","e", "l", "r")
DISTRICTS_PREFIXES = ("мо ", "р","городское","лесхоз")
COUNTRYSIDE_PREFIXES = (
"г", "п", "д", "гп", "рп", "кп", "пгт", "c", "хутор", " урочище")
TERRITORY_PREFIXES = (
"тер.", " тер", "снт ", "ст ", "дск ", "днп ", "дпк ", "нп ", "пдк ", "т/б ", "садоводство", "массив", "хозя", "сад-во")
STREET_PREFIXES = (
" ул", " бул", " пр", " ш", " пер", " дор", " маг", " наб", " пл", " просп", " туп", "шоссе", "лини", "аллея",
"мост", " парк", "кольцо", "проезд", "съезд","переулок",
"ул.", "бул.", "пр.", "ш.", "пер.", "дор.", "маг.", "наб.", "пл.", "просп.", "туп.")
HOUSES_PREFIXES = ("д.", "уч.", "участок", "мкд", "тп","дом","дома")
BUILDING_PREFIXES = ("к.", "к ","корп", "корпус")
EDIFICE_PREFIXES=("стр.", "строение")
LETTER = ("лит.", "литера", " л.")
PREFIXES = (DISTRICTS_PREFIXES, COUNTRYSIDE_PREFIXES, TERRITORY_PREFIXES, STREET_PREFIXES, HOUSES_PREFIXES, BUILDING_PREFIXES, EDIFICE_PREFIXES,LETTER)
def unfold_house_ranges(token: str) -> List[str]:
addresses = []
pairs_strings = re.findall(r"([\d]+-[\d]+)", token)
for pair_string in pairs_strings:
a, b = pair_string.split("-")
a, b = int(a), int(b)
if b > a:
addresses += [re.sub(r"([\d]+-[\d]+)", number, token) for number in map(str, range(a, b + 1))]
else:
token = token.replace("-", "/")
if not addresses:
addresses.append(token)
return addresses
def any_of_in(substrings: Iterable[str], string: str) -> bool:
return any(map(lambda substring: substring in string, substrings))
def flatten(arr: Iterable[List[T]]) -> List[T]:
return sum(arr, [])
def find_room(token: pd.Series, pre_token: pd.Series) -> str:
if re.search(r"\bпом\.?", token['obj']):
return "r"
return ""
def find_litera(token: pd.Series, pre_token: pd.Series) -> str:
if find_room(token, pre_token):
return ""
if any_of_in(LETTER, token['obj'].lower()) \
or re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", token['obj']):
return "l"
if (re.search(r"\b([А-Я]|[а-я]){1}$", token['obj']) \
and ("l" in pre_token['class'] or "h" in pre_token['class'])) \
and not (" ш" in token["obj"]) \
and not find_countryside(token, pre_token):
return "l"
return ""
def find_edifice(token: pd.Series, pre_token: pd.Series) -> str:
if any_of_in(EDIFICE_PREFIXES, token['obj'].lower()):
return "e"
return ""
def find_building(token: pd.Series, pre_token: pd.Series) -> str:
if re.search(r"\d", token['obj']) and not find_room(token,pre_token):
if any_of_in(BUILDING_PREFIXES, token['obj'].lower()) \
or "b" in pre_token['class'] and not ("h" in token['class']) and not find_edifice(token,pre_token)\
or re.search(r"к\.* ?\d", token['obj']):
return "b"
return ""
def find_house(token: pd.Series, pre_token: pd.Series) -> str:
if re.search(r"\d{1,4}", token['obj']) and not find_room(token,pre_token):
if any_of_in(HOUSES_PREFIXES, token['obj'].lower()):
return "h"
if re.search(r"(д|д\.) ?\d{1,4} ?\/*\d* ?", token['obj']):
return "h"
if ("s" in pre_token['class'] or "h" in pre_token['class'] or "s" in token['class']) \
and not any_of_in(("", "", ""), token['obj']) \
and not find_building(token, pre_token)\
and not find_edifice(token,pre_token):
return "h"
if find_building(token, pre_token) \
and not any_of_in(("", "", ""), token['obj']) \
and True:
if len(re.findall(r"\d{1,4}", token['obj'])) > 1:
return "h"
if int(re.search(r"\d{1,4}", token['obj']).group()) // 10 >0:
return "h"
return ""
def find_street(token: pd.Series, pre_token: pd.Series) -> str:
if any_of_in(STREET_PREFIXES, token['obj'].lower()):
return "s"
if re.search(r"\b[А-Яа-я]{4,}\b", token['obj']) \
and not any([el in token["obj"].lower() for pr in PREFIXES for el in pr if len(el)>2]) \
and not ("d" in token["class"] or "t" in token["class"] or "c" in token["class"]):
return "s"
return ""
def find_territory(token: pd.Series, pre_token: pd.Series) -> str:
if any_of_in(TERRITORY_PREFIXES, token['obj'].lower()):
return "t"
return ""
def find_countryside(token: pd.Series, pre_token: pd.Series) -> str:
if any_of_in(COUNTRYSIDE_PREFIXES, token['obj'].lower()) \
and re.search(r"\b[гпдрпктc]{1,3}(\b|\. )", token['obj']) \
and not find_house(token, pre_token) \
and not any_of_in(STREET_PREFIXES, token['obj'].lower()):
return "c"
return ""
def find_district(token: pd.Series, pre_token: pd.Series) -> str:
if any_of_in(DISTRICTS_PREFIXES, token['obj'].lower()):
return "d"
return ""
def address_classification(token: pd.Series, pre_token: pd.Series) -> pd.Series:
brackets = re.search(r"\(.+\)", token["obj"])
if brackets:
token["obj"] = re.sub(r"\(.+\)", "()", token["obj"])
token["class"] += find_district(token, pre_token)
token["class"] += find_countryside(token, pre_token)
token["class"] += find_territory(token, pre_token)
token["class"] += find_street(token, pre_token)
token["class"] += find_house(token, pre_token)
token["class"] += find_building(token, pre_token)
token["class"] += find_edifice(token, pre_token)
token["class"] += find_litera(token, pre_token)
token["class"] += find_room(token, pre_token)
if token['class'] == "":
token['class'] = "w"
if brackets:
token["obj"] = re.sub(r"\(\)", brackets.group(), token["obj"])
return token
def cut_address(ad: pd.Series, cl: str) -> pd.Series:
while ad["class"] and CLASSES.index(ad["class"][-1]) > CLASSES.index(cl[0]):
if ad["class"][-1] == "h":
ad["address"] = re.sub(r"[мкдтпучасток]*\.? ?\d{1,4} ?\/*\d* ?", "",
ad["address"].lower())
elif ad["class"][-1] == "b":
num = re.findall("к{0,1}\.? ?\d", ad["address"])[-1]
ad["address"] = re.sub(num, "", ad["address"])
elif ad["class"][-1] == "e":
ad["address"] = re.sub(r"р\.? ?\d", "", ad["address"])
elif ad["class"][-1] == "l":
ad["address"] = re.sub(r"[литера]*\.? ?[А-Яа-я]{1}$", "", ad["address"])
elif ad["class"][-1] == "r":
ad["address"] = re.sub(r"пом\.? ?\d+", "", ad["address"])
ad["class"] = ad["class"][:-1]
return ad
# TODO: переработать систему из if в нормальный вид
def split_address(address: str) -> List[str]:
if ";" in address:
address = address.replace(";", ",")
if "," in address:
tokens = address.split(",")
t = list(map(str.strip, filter(lambda token: token != "", tokens)))
tokens = pd.DataFrame()
tokens['obj'] = t
for el in ("", "уг.", "д."):
tokens = tokens[tokens["obj"] != el]
tokens.insert(len(tokens.columns), "class", "")
res = []
accumulator = pd.Series(data={"address": "", "class": ""})
for i in range(len(tokens)):
cur_tk = tokens.iloc[i]
if i == 0:
pre_token = pd.Series(data=["", ""], index=['obj', 'class'])
else:
pre_token = tokens.iloc[i - 1]
cur_tk = address_classification(cur_tk, pre_token)
tokens.iloc[i] = cur_tk
print(tokens.iloc[i])
if not accumulator["class"]:
accumulator["class"] = cur_tk['class']
accumulator["address"] = cur_tk["obj"]
continue
if CLASSES.index(accumulator["class"][-1]) < CLASSES.index(cur_tk["class"][0]) and accumulator["class"]!="w":
accumulator["class"] += cur_tk['class']
accumulator["address"] += " " + cur_tk["obj"]
else:
ad_no_ranges = unfold_house_ranges(accumulator["address"])
accumulator["address"] = ad_no_ranges[-1]
res.extend(ad_no_ranges)
accumulator = cut_address(accumulator, cur_tk["class"])
if not accumulator["class"] or CLASSES.index(cur_tk["class"][0]) <= CLASSES.index("s") or accumulator["class"]=="w":
accumulator["class"] = cur_tk["class"]
accumulator["address"] = cur_tk["obj"]
if cur_tk["class"][0] == "h":
num = re.findall("\d{1,4} ?[\/\-]?\d* ?", cur_tk['obj'])[0]
if any_of_in(("", "", ""), accumulator["address"]):
idx = 1
else:
idx = 0
num_ac = re.findall("\d{1,4} ?[\/\-]?\d* ?", accumulator["address"])
if num_ac:
accumulator["address"] = re.sub(num_ac[idx], num, accumulator["address"])
cur_tk["class"] =cur_tk["class"][1:]
if cur_tk["class"] and cur_tk["class"][0] == "b":
num = re.findall("\d", cur_tk["obj"])[-1]
if num and not "b" in accumulator["class"]:
accumulator["class"] += "b"
accumulator["address"] += "к." + num
else:
accumulator["address"] = re.sub(r"\d$", num, accumulator["address"])
cur_tk["class"] = cur_tk["class"][1:]
if cur_tk["class"] and cur_tk["class"][0] == "e":
num = re.findall("стр\.? ?\d", cur_tk["obj"].strip())[-1]
accumulator["address"] = re.sub(r"р\. ?\d", num, accumulator["address"].strip())
if num and not "e" in accumulator["class"]:
accumulator["class"] += "e"
cur_tk["class"] = cur_tk["class"][1:]
if cur_tk["class"] and cur_tk["class"][0] == "l":
num = re.findall("[А-Яа-я]", cur_tk["obj"].strip())[-1]
accumulator["address"] = re.sub(r"[А-Яа-я]$", "", accumulator["address"].strip())
accumulator["address"] += num
if num and not "l" in accumulator["class"]:
accumulator["class"] += "l"
else:
if re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", accumulator["address"]):
accumulator["address"] = re.sub(r"[А-Яа-я]$", "", accumulator["address"].strip())
if cur_tk["class"] and cur_tk["class"][0] == "r":
num = re.findall("пом\. ?\-?\d*\w?", cur_tk["obj"].strip())[-1]
accumulator["address"] = re.sub(r"пом\. ?\d\-?\d*\w?", num, accumulator["address"].strip())
if num and not "r" in accumulator["class"]:
accumulator["class"] += "r"
cur_tk["class"] = cur_tk["class"][1:]
res.extend(unfold_house_ranges(accumulator["address"]))
print(res)
return res
return [address]
def split_pesoch_res(address: str) -> List[str]:
t = re.sub(r",", " ", address)
t = re.split(r"(Санкт-Петербург|Ленинградская обл|Л\.О)", t)
t = list(map(str.strip, filter(lambda token: token != "", t)))
tokens = [t[i] + " " + t[i+1] for i in range(0, len(t)-1, 2)]
if tokens:
return list(set(tokens))
return [address]
def process_row(row: pd.Series[str]) -> pd.Series[str]:
row = row.copy()
if pd.isnull(row["Улица"]):
row["Улица"] = [None]
else:
if row["РЭС"] == "Песочинский РЭС":
addresses = split_pesoch_res(row["Улица"])
else:
addresses = split_address(row["Улица"])
row["Улица"] = addresses
return row
def split_addresses(df: pd.DataFrame) -> pd.DataFrame:
merged_df = df.apply(process_row, axis=1).reset_index()
return merged_df.explode("Улица", ignore_index=True)

View File

@ -0,0 +1,12 @@
from .classifier import CLASSES, address_classification
from .splitter import AddressSplitter, split_addresses, split_pesoch_res
from .utils import create_token
__all__ = (
"address_classification",
"AddressSplitter",
"CLASSES",
"create_token",
"split_addresses",
"split_pesoch_res",
)

View File

@ -0,0 +1,215 @@
import re
import pandas as pd
from .utils import any_of_in
CLASSES = ("w", "d", "c", "t", "s", "h", "b", "e", "l", "r")
DISTRICTS_PREFIXES = ("мо ", "р", "городское", "лесхоз")
COUNTRYSIDE_PREFIXES = (
"г",
"п",
"д",
"гп",
"рп",
"кп",
"пгт",
"c",
"хутор",
" урочище",
)
TERRITORY_PREFIXES = (
"тер.",
" тер",
"снт ",
"ст ",
"дск ",
"днп ",
"дпк ",
"нп ",
"пдк ",
"т/б ",
"садоводство",
"массив",
"хозя",
"сад-во",
)
STREET_PREFIXES = (
" ул",
" бул",
" пр",
" ш",
" пер",
" дор",
" маг",
" наб",
" пл",
" просп",
" туп",
"шоссе",
"лини",
"аллея",
"мост",
" парк",
"кольцо",
"проезд",
"съезд",
"переулок",
"ул.",
"бул.",
"пр.",
"ш.",
"пер.",
"дор.",
"маг.",
"наб.",
"пл.",
"просп.",
"туп.",
)
HOUSES_PREFIXES = ("д.", "уч.", "участок", "мкд", "тп", "дом", "дома")
BUILDING_PREFIXES = ("к.", "к ", "корп", "корпус")
EDIFICE_PREFIXES = ("стр.", "строение")
LETTER = ("лит.", "литера", " л.")
PREFIXES = (
DISTRICTS_PREFIXES,
COUNTRYSIDE_PREFIXES,
TERRITORY_PREFIXES,
STREET_PREFIXES,
HOUSES_PREFIXES,
BUILDING_PREFIXES,
EDIFICE_PREFIXES,
LETTER,
)
def find_room(token: pd.Series, pre_token: pd.Series) -> str:
if re.search(r"\bпом\.?", token["obj"]):
return "r"
return ""
def find_litera(token: pd.Series, pre_token: pd.Series) -> str:
if find_room(token, pre_token):
return ""
if any_of_in(LETTER, token["obj"].lower()) or re.search(
r"\d{1,3}([А-Я]|[а-я])( |$)", token["obj"]
):
return "l"
if (
(
re.search(r"\b([А-Я]|[а-я]){1}$", token["obj"])
and ("l" in pre_token["class"] or "h" in pre_token["class"])
)
and (" ш" not in token["obj"])
and not find_countryside(token, pre_token)
):
return "l"
return ""
def find_edifice(token: pd.Series, pre_token: pd.Series) -> str:
if any_of_in(EDIFICE_PREFIXES, token["obj"].lower()):
return "e"
return ""
def find_building(token: pd.Series, pre_token: pd.Series) -> str:
if re.search(r"\d", token["obj"]) and not find_room(token, pre_token):
if (
any_of_in(BUILDING_PREFIXES, token["obj"].lower())
or "b" in pre_token["class"]
and ("h" not in token["class"])
and not find_edifice(token, pre_token)
or re.search(r"к\.* ?\d", token["obj"])
):
return "b"
return ""
def find_house(token: pd.Series, pre_token: pd.Series) -> str:
if re.search(r"\d{1,4}", token["obj"]) and not find_room(token, pre_token):
if any_of_in(HOUSES_PREFIXES, token["obj"].lower()):
return "h"
if re.search(r"(д|д\.) ?\d{1,4} ?\/*\d* ?", token["obj"]):
return "h"
if (
(
"s" in pre_token["class"]
or "h" in pre_token["class"]
or "s" in token["class"]
)
and not any_of_in(("", "", ""), token["obj"])
and not find_building(token, pre_token)
and not find_edifice(token, pre_token)
):
return "h"
if (
find_building(token, pre_token)
and not any_of_in(("", "", ""), token["obj"])
and True
):
if len(re.findall(r"\d{1,4}", token["obj"])) > 1:
return "h"
if int(re.search(r"\d{1,4}", token["obj"]).group()) // 10 > 0:
return "h"
return ""
def find_street(token: pd.Series, pre_token: pd.Series) -> str:
if any_of_in(STREET_PREFIXES, token["obj"].lower()):
return "s"
if (
re.search(r"\b[А-Яа-я]{4,}\b", token["obj"])
and not any(
[el in token["obj"].lower() for pr in PREFIXES for el in pr if len(el) > 2]
)
and not (
"d" in token["class"] or "t" in token["class"] or "c" in token["class"]
)
):
return "s"
return ""
def find_territory(token: pd.Series, pre_token: pd.Series) -> str:
if any_of_in(TERRITORY_PREFIXES, token["obj"].lower()):
return "t"
return ""
def find_countryside(token: pd.Series, pre_token: pd.Series) -> str:
if (
any_of_in(COUNTRYSIDE_PREFIXES, token["obj"].lower())
and re.search(r"\b[гпдрпктc]{1,3}(\b|\. )", token["obj"])
and not find_house(token, pre_token)
and not any_of_in(STREET_PREFIXES, token["obj"].lower())
):
return "c"
return ""
def find_district(token: pd.Series, pre_token: pd.Series) -> str:
if any_of_in(DISTRICTS_PREFIXES, token["obj"].lower()):
return "d"
return ""
def address_classification(token: pd.Series, pre_token: pd.Series) -> pd.Series:
brackets = re.search(r"\(.+\)", token["obj"])
if brackets:
token["obj"] = re.sub(r"\(.+\)", "()", token["obj"])
token["class"] += find_district(token, pre_token)
token["class"] += find_countryside(token, pre_token)
token["class"] += find_territory(token, pre_token)
token["class"] += find_street(token, pre_token)
token["class"] += find_house(token, pre_token)
token["class"] += find_building(token, pre_token)
token["class"] += find_edifice(token, pre_token)
token["class"] += find_litera(token, pre_token)
token["class"] += find_room(token, pre_token)
if token["class"] == "":
token["class"] = "w"
if brackets:
token["obj"] = re.sub(r"\(\)", brackets.group(), token["obj"])
return token

292
parser/address/splitter.py Normal file
View File

@ -0,0 +1,292 @@
from __future__ import annotations
import re
from collections.abc import Sequence
import pandas as pd
from .classifier import CLASSES, address_classification
from .utils import any_of_in, create_token, is_valid_token, unfold_house_ranges
class AddressSplitter(Sequence):
def __init__(self, address: str):
self.input = address
self.addresses = self.split()
# Sequence abstract methods implementation
def __getitem__(self, key: int):
if key < len(self.addresses):
return self.addresses[key]
else:
raise IndexError()
def __len__(self):
return len(self.addresses)
# Address token class manipulations
def next_class(self) -> str:
return self.token["class"][0]
def prev_class(self) -> str:
return self.accumulator["class"][-1]
def correct_order(self) -> bool:
return (
len(self.accumulator["class"]) > 0
and CLASSES.index(self.prev_class()) < CLASSES.index(self.next_class())
and self.accumulator["class"] != "w"
)
def next_class_is(self, comparing_class: str) -> bool:
return len(self.token["class"]) > 0 and self.next_class() == comparing_class[0]
def has_no_class(self, comparing_class: str) -> bool:
return comparing_class[0] not in self.accumulator["class"]
def pop_token_class(self):
self.token["class"] = self.token["class"][1:]
# Accumulator constrains
def next_is_street_or_upper(self) -> bool:
is_unknown_class = self.accumulator["class"] in ("", "w")
return (
CLASSES.index(self.next_class()) <= CLASSES.index("s") or is_unknown_class
)
def has_numbered_street(self) -> bool:
return any_of_in(("", "", ""), self.accumulator["address"])
# Accumulator manipulation
## House
def substitue_house(self) -> str:
house_regex = re.compile(r"\d{1,4} ?[\/\-]?\d* ?")
number = house_regex.findall(self.token["obj"])[0]
if self.has_numbered_street():
house_number_index = 1
else:
house_number_index = 0
number_in_accumulator = house_regex.findall(self.accumulator["address"])
if number_in_accumulator:
return re.sub(
number_in_accumulator[house_number_index],
number,
self.accumulator["address"],
)
else:
return self.accumulator["address"]
## Building
def append_building(self, number: int) -> pd.Series:
self.accumulator["class"] += "b"
self.accumulator["address"] += "к." + number
return self.accumulator
def substitue_building(self, number: int) -> str:
return re.sub(r"\d$", number, self.accumulator["address"])
def insert_building(self):
number = re.findall(r"\d", self.token["obj"])[-1]
if number and self.has_no_class("building"):
self.accumulator = self.append_building(number)
else:
self.accumulator["address"] = self.substitue_building(number)
## Edifice
def substitue_edifice(self, number: int) -> str:
return re.sub(r"р\. ?\d", number, self.accumulator["address"].strip())
def insert_edifice(self):
number = re.findall("стр\.? ?\d", self.token["obj"])[-1]
self.accumulator["address"] = self.substitue_edifice(number)
if number and self.has_no_class("edifice"):
self.accumulator["class"] += "e"
## Letter
def without_letter(self) -> str:
return re.sub(r"[А-Яа-я]$", "", self.accumulator["address"].strip())
def substitue_letter(self, letter: str) -> str:
address_without_letter = self.without_letter()
return address_without_letter + letter
def insert_letter(self):
letter = re.findall(r"[А-Яа-я]", self.token["obj"])[-1]
self.accumulator["address"] = self.substitue_letter(letter)
if letter and self.has_no_class("litera"):
self.accumulator["class"] += "l"
def has_letter_in(self) -> bool:
return re.search(r"\d{1,3}([А-Я]|[а-я])( |$)", self.accumulator["address"])
## Room
def substitue_room(self, number: int) -> str:
return re.sub(
r"пом\. ?\d\-?\d*\w?", number, self.accumulator["address"].strip()
)
def insert_room(self):
number = re.findall("пом\. ?\-?\d*\w?", self.token["obj"])[-1]
self.accumulator["address"] = self.substitue_room(number)
if number and self.has_no_class("room"):
self.accumulator["class"] += "r"
# Data preprocessing
def split_tokens(self) -> list[pd.Series]:
address = self.input.replace(";", ",")
parts = address.split(",")
parts = map(str.strip, parts)
parts = filter(is_valid_token, parts)
tokens = map(lambda part: create_token(part, ""), parts)
return list(tokens)
def cut_address(self) -> pd.Series:
while len(self.accumulator["class"]) > 0 and CLASSES.index(
self.prev_class()
) > CLASSES.index(self.next_class()):
match self.accumulator["class"][-1]:
case "h":
self.accumulator["addresses"] = re.sub(
r"[мкдтпучасток]*\.? ?\d{1,4} ?\/*\d* ?",
"",
self.accumulator["address"].lower(),
)
case "b":
number = re.findall(r"к{0,1}\.? ?\d", self.accumulator["address"])[
-1
]
self.accumulator["address"] = re.sub(
number, "", self.accumulator["address"]
)
case "e":
self.accumulator["address"] = re.sub(
r"р\.? ?\d", "", self.accumulator["address"]
)
case "l":
self.accumulator["address"] = re.sub(
r"[литера]*\.? ?[А-Яа-я]{1}$", "", self.accumulator["address"]
)
case "r":
self.accumulator["address"] = re.sub(
r"пом\.? ?\d+", "", self.accumulator["address"]
)
self.accumulator["class"] = self.accumulator["class"][:-1]
return self.accumulator
# Splitting
def split(self):
self.tokens = self.split_tokens()
result = []
self.accumulator = pd.Series({"address": "", "class": ""})
prev_token = create_token()
for cursor in self.tokens:
self.token = address_classification(cursor, prev_token)
prev_token = self.token.copy()
if self.accumulator["class"] == "":
self.accumulator = self.token.rename({"obj": "address"})
continue
if self.correct_order():
self.accumulator["address"] += " "
self.accumulator += self.token.rename({"obj": "address"})
else:
unfolded_address = unfold_house_ranges(self.accumulator["address"])
self.accumulator["address"] = unfolded_address[-1]
result.extend(unfolded_address)
self.accumulator = self.cut_address()
if self.next_is_street_or_upper():
self.accumulator = self.token.rename({"obj": "address"})
if self.next_class_is("house"):
self.accumulator["address"] = self.substitue_house()
self.pop_token_class()
if self.next_class_is("building"):
self.insert_building()
self.pop_token_class()
if self.next_class_is("edifice"):
self.insert_edifice()
self.pop_token_class()
if self.next_class_is("letter"):
self.insert_letter()
elif self.has_letter_in():
self.accumulator["address"] = self.without_letter()
if self.next_class_is("room"):
self.insert_room()
self.pop_token_class()
result.extend(unfold_house_ranges(self.accumulator["address"]))
return result
def split_pesoch_res(address: str) -> list[str]:
t = re.sub(r",", " ", address)
t = re.split(r"(Санкт-Петербург|Ленинградская обл|Л\.О)", t)
t = list(map(str.strip, filter(lambda token: token != "", t)))
tokens = [t[i] + " " + t[i + 1] for i in range(0, len(t) - 1, 2)]
if tokens:
return list(set(tokens))
return [address]
def process_row(row: pd.Series[str]) -> pd.Series[str]:
row = row.copy()
if pd.isnull(row["Улица"]):
row["Улица"] = [None]
else:
if row["РЭС"] == "Песочинский РЭС":
addresses = split_pesoch_res(row["Улица"])
else:
addresses = AddressSplitter(row["Улица"])
row["Улица"] = addresses
return row
def split_addresses(df: pd.DataFrame) -> pd.DataFrame:
merged_df = df.apply(process_row, axis=1).reset_index()
return merged_df.explode("Улица", ignore_index=True)

45
parser/address/utils.py Normal file
View File

@ -0,0 +1,45 @@
import re
from collections.abc import Iterable
from typing import TypeVar
import pandas as pd
T = TypeVar("T")
def any_of_in(substrings: Iterable[str], string: str) -> bool:
return any(map(lambda substring: substring in string, substrings))
def flatten(arr: Iterable[list[T]]) -> list[T]:
return sum(arr, [])
def unfold_house_ranges(token: str) -> list[str]:
addresses = []
pairs_strings = re.findall(r"([\d]+-[\d]+)", token)
for pair_string in pairs_strings:
a, b = pair_string.split("-")
a, b = int(a), int(b)
if b > a:
addresses += [
re.sub(r"([\d]+-[\d]+)", number, token)
for number in map(str, range(a, b + 1))
]
else:
token = token.replace("-", "/")
if not addresses:
addresses.append(token)
return addresses
def is_valid_token(string: str) -> bool:
return string not in ("", "уг.", "д.")
def create_token(obj: str = "", token_class: str = ""):
return pd.Series(
{
"obj": obj,
"class": token_class,
}
)

View File

@ -1,16 +1,14 @@
from typing import Optional from typing import Optional
from . import ( from .lenenergo import LenenergoParser
LenenergoParser, from .building_id import concurrent_fetch_builing_ids
concurrent_fetch_builing_ids, from .preprocess import preprocess_df
preprocess_df, from .address import split_addresses
split_addresses,
)
def pipeline(parser: Optional[LenenergoParser] = None) -> LenenergoParser: def pipeline(parser: Optional[LenenergoParser] = None) -> LenenergoParser:
if parser is None: if parser is None:
parser = LenenergoParser(file_path = r"C:\Users\Юля\PycharmProjects\machine_learning\lenengro_parser\data_Rosseti.csv") parser = LenenergoParser()
print(parser) print(parser)