style: use black as formatter

This commit is contained in:
NateScarlet 2021-09-22 19:22:48 +08:00
parent 5ef525db94
commit c79acdc39d
No known key found for this signature in database
GPG Key ID: 5C242793B070309C
3 changed files with 115 additions and 104 deletions

View File

@ -11,10 +11,10 @@ from typing import Iterator, List, Optional, Tuple
import bs4 import bs4
import requests import requests
SEARCH_URL = 'http://sousuo.gov.cn/s.htm' SEARCH_URL = "http://sousuo.gov.cn/s.htm"
PAPER_EXCLUDE = [ PAPER_EXCLUDE = [
'http://www.gov.cn/zhengce/content/2014-09/29/content_9102.htm', "http://www.gov.cn/zhengce/content/2014-09/29/content_9102.htm",
'http://www.gov.cn/zhengce/content/2015-02/09/content_9466.htm', "http://www.gov.cn/zhengce/content/2015-02/09/content_9466.htm",
] ]
@ -28,16 +28,20 @@ def get_paper_urls(year: int) -> List[str]:
List[str]: Urls newlest first. List[str]: Urls newlest first.
""" """
body = requests.get(SEARCH_URL, params={ body = requests.get(
't': 'paper', SEARCH_URL,
'advance': 'true', params={
'title': year, "t": "paper",
'q': '假期', "advance": "true",
'pcodeJiguan': '国办发明电', "title": year,
'puborg': '国务院办公厅' "q": "假期",
}).text "pcodeJiguan": "国办发明电",
"puborg": "国务院办公厅",
},
).text
ret = re.findall( ret = re.findall(
r'<li class="res-list".*?<a href="(.+?)".*?</li>', body, flags=re.S) r'<li class="res-list".*?<a href="(.+?)".*?</li>', body, flags=re.S
)
ret = [i for i in ret if i not in PAPER_EXCLUDE] ret = [i for i in ret if i not in PAPER_EXCLUDE]
ret.sort() ret.sort()
return ret return ret
@ -53,16 +57,17 @@ def get_paper(url: str) -> str:
str: Extracted paper text. str: Extracted paper text.
""" """
assert re.match(r'http://www.gov.cn/zhengce/content/\d{4}-\d{2}/\d{2}/content_\d+.htm', assert re.match(
url), 'Site changed, need human verify' r"http://www.gov.cn/zhengce/content/\d{4}-\d{2}/\d{2}/content_\d+.htm", url
), "Site changed, need human verify"
response = requests.get(url) response = requests.get(url)
response.encoding = 'utf-8' response.encoding = "utf-8"
soup = bs4.BeautifulSoup(response.text, features='html.parser') soup = bs4.BeautifulSoup(response.text, features="html.parser")
container = soup.find('td', class_='b12c') container = soup.find("td", class_="b12c")
assert container, f'Can not get paper container from url: {url}' assert container, f"Can not get paper container from url: {url}"
ret = container.get_text().replace('\u3000\u3000', '\n') ret = container.get_text().replace("\u3000\u3000", "\n")
assert ret, f'Can not get paper context from url: {url}' assert ret, f"Can not get paper context from url: {url}"
return ret return ret
@ -99,7 +104,7 @@ def get_normal_rules(lines: Iterator[str]) -> Iterator[Tuple[str, str]]:
Iterator[Tuple[str, str]]: (name, description) Iterator[Tuple[str, str]]: (name, description)
""" """
for i in lines: for i in lines:
match = re.match(r'[一二三四五六七八九十]、(.+?)(.+)', i) match = re.match(r"[一二三四五六七八九十]、(.+?)(.+)", i)
if match: if match:
yield match.groups() yield match.groups()
@ -115,16 +120,16 @@ def get_patch_rules(lines: Iterator[str]) -> Iterator[Tuple[str, str]]:
""" """
name = None name = None
for i in lines: for i in lines:
match = re.match(r'.*\d+年([^和、]{2,})(?:假期|放假).*安排', i) match = re.match(r".*\d+年([^和、]{2,})(?:假期|放假).*安排", i)
if match: if match:
name = match.group(1) name = match.group(1)
if not name: if not name:
continue continue
match = re.match(r'^[一二三四五六七八九十]、(.+)$', i) match = re.match(r"^[一二三四五六七八九十]、(.+)$", i)
if not match: if not match:
continue continue
description = match.group(1) description = match.group(1)
if re.match(r'.*\d+月\d+日.*', description): if re.match(r".*\d+月\d+日.*", description):
yield name, description yield name, description
@ -133,7 +138,7 @@ def _cast_int(value):
class DescriptionParser: class DescriptionParser:
"""Parser for holiday shift description. """ """Parser for holiday shift description."""
def __init__(self, description: str, year: int): def __init__(self, description: str, year: int):
self.description = description self.description = description
@ -148,7 +153,7 @@ class DescriptionParser:
""" """
del self.date_history[:] del self.date_history[:]
for i in re.split('[,。;]', self.description): for i in re.split("[,。;]", self.description):
for j in SentenceParser(self, i).parse(): for j in SentenceParser(self, i).parse():
yield j yield j
@ -167,17 +172,19 @@ class DescriptionParser:
date: Date result date: Date result
""" """
assert day, 'No day specified' assert day, "No day specified"
# Special case: month inherit # Special case: month inherit
if month is None: if month is None:
month = self.date_history[-1].month month = self.date_history[-1].month
# Special case: 12 month may mean previous year # Special case: 12 month may mean previous year
if (year is None if (
year is None
and month == 12 and month == 12
and self.date_history and self.date_history
and max(self.date_history) < date(year=self.year, month=2, day=1)): and max(self.date_history) < date(year=self.year, month=2, day=1)
):
year = self.year - 1 year = self.year - 1
year = year or self.year year = year or self.year
@ -185,10 +192,10 @@ class DescriptionParser:
class SentenceParser: class SentenceParser:
"""Parser for holiday shift description sentence. """ """Parser for holiday shift description sentence."""
special_cases = { special_cases = {
'延长2020年春节假期至2月2日农历正月初九': [ "延长2020年春节假期至2月2日农历正月初九": [
{"date": date(2020, 1, 31), "isOffDay": True}, {"date": date(2020, 1, 31), "isOffDay": True},
{"date": date(2020, 2, 1), "isOffDay": True}, {"date": date(2020, 2, 1), "isOffDay": True},
{"date": date(2020, 2, 2), "isOffDay": True}, {"date": date(2020, 2, 2), "isOffDay": True},
@ -210,8 +217,10 @@ class SentenceParser:
""" """
count = 0 count = 0
text = text.replace('(', '').replace(')', '') text = text.replace("(", "").replace(")", "")
for i in chain(*(method(self, text) for method in self.date_extraction_methods)): for i in chain(
*(method(self, text) for method in self.date_extraction_methods)
):
count += 1 count += 1
is_seen = i in self.parent.date_history is_seen = i in self.parent.date_history
self.parent.date_history.append(i) self.parent.date_history.append(i)
@ -223,7 +232,7 @@ class SentenceParser:
raise NotImplementedError(text) raise NotImplementedError(text)
def _extract_dates_1(self, value: str) -> Iterator[date]: def _extract_dates_1(self, value: str) -> Iterator[date]:
match = re.findall(r'(?:(\d+)年)?(?:(\d+)月)?(\d+)日', value) match = re.findall(r"(?:(\d+)年)?(?:(\d+)月)?(\d+)日", value)
for groups in match: for groups in match:
groups = [_cast_int(i) for i in groups] groups = [_cast_int(i) for i in groups]
assert len(groups) == 3, groups assert len(groups) == 3, groups
@ -231,33 +240,31 @@ class SentenceParser:
def _extract_dates_2(self, value: str) -> Iterator[date]: def _extract_dates_2(self, value: str) -> Iterator[date]:
match = re.findall( match = re.findall(
r'(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:至|-|—)(?:(\d+)年)?(?:(\d+)月)?(\d+)日', value) r"(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:至|-|—)(?:(\d+)年)?(?:(\d+)月)?(\d+)日", value
)
for groups in match: for groups in match:
groups = [_cast_int(i) for i in groups] groups = [_cast_int(i) for i in groups]
assert len(groups) == 6, groups assert len(groups) == 6, groups
start = self.parent.get_date(year=groups[0], start = self.parent.get_date(year=groups[0], month=groups[1], day=groups[2])
month=groups[1], day=groups[2]) end = self.parent.get_date(year=groups[3], month=groups[4], day=groups[5])
end = self.parent.get_date(year=groups[3],
month=groups[4], day=groups[5])
for i in range((end - start).days + 1): for i in range((end - start).days + 1):
yield start + timedelta(days=i) yield start + timedelta(days=i)
def _extract_dates_3(self, value: str) -> Iterator[date]: def _extract_dates_3(self, value: str) -> Iterator[date]:
match = re.findall( match = re.findall(
r'(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:[^]+)?' r"(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:[^]+)?"
r'(?:、(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:[^]+)?)+', r"(?:、(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:[^]+)?)+",
value) value,
)
for groups in match: for groups in match:
groups = [_cast_int(i) for i in groups] groups = [_cast_int(i) for i in groups]
assert not (len(groups) % 3), groups assert not (len(groups) % 3), groups
for i in range(0, len(groups), 3): for i in range(0, len(groups), 3):
yield self.parent.get_date(year=groups[i], month=groups[i+1], day=groups[i+2]) yield self.parent.get_date(
year=groups[i], month=groups[i + 1], day=groups[i + 2]
)
date_extraction_methods = [ date_extraction_methods = [_extract_dates_1, _extract_dates_2, _extract_dates_3]
_extract_dates_1,
_extract_dates_2,
_extract_dates_3
]
def parse(self) -> Iterator[dict]: def parse(self) -> Iterator[dict]:
"""Parse days with memory """Parse days with memory
@ -273,36 +280,24 @@ class SentenceParser:
yield i yield i
def _parse_rest_1(self): def _parse_rest_1(self):
match = re.match(r'(.+)(放假|补休|调休|公休)+(?:\d+天)?$', self.sentence) match = re.match(r"(.+)(放假|补休|调休|公休)+(?:\d+天)?$", self.sentence)
if match: if match:
for i in self.extract_dates(match.group(1)): for i in self.extract_dates(match.group(1)):
yield { yield {"date": i, "isOffDay": True}
'date': i,
'isOffDay': True
}
def _parse_work_1(self): def _parse_work_1(self):
match = re.match('(.+)上班$', self.sentence) match = re.match("(.+)上班$", self.sentence)
if match: if match:
for i in self.extract_dates(match.group(1)): for i in self.extract_dates(match.group(1)):
yield { yield {"date": i, "isOffDay": False}
'date': i,
'isOffDay': False
}
def _parse_shift_1(self): def _parse_shift_1(self):
match = re.match('(.+)调至(.+)', self.sentence) match = re.match("(.+)调至(.+)", self.sentence)
if match: if match:
for i in self.extract_dates(match.group(1)): for i in self.extract_dates(match.group(1)):
yield { yield {"date": i, "isOffDay": False}
'date': i,
'isOffDay': False
}
for i in self.extract_dates(match.group(2)): for i in self.extract_dates(match.group(2)):
yield { yield {"date": i, "isOffDay": True}
'date': i,
'isOffDay': True
}
def _parse_special(self): def _parse_special(self):
for i in self.special_cases.get(self.sentence, []): for i in self.special_cases.get(self.sentence, []):
@ -328,49 +323,50 @@ def parse_paper(year: int, url: str) -> Iterator[dict]:
""" """
paper = get_paper(url) paper = get_paper(url)
rules = get_rules(paper) rules = get_rules(paper)
ret = ({'name': name, **i} ret = (
{"name": name, **i}
for name, description in rules for name, description in rules
for i in DescriptionParser(description, year).parse()) for i in DescriptionParser(description, year).parse()
)
try: try:
for i in ret: for i in ret:
yield i yield i
except NotImplementedError as ex: except NotImplementedError as ex:
raise RuntimeError('Can not parse paper', url) from ex raise RuntimeError("Can not parse paper", url) from ex
def fetch_holiday(year: int): def fetch_holiday(year: int):
"""Fetch holiday data. """ """Fetch holiday data."""
papers = get_paper_urls(year) papers = get_paper_urls(year)
days = dict() days = dict()
for k in (j for k in (j for i in papers for j in parse_paper(year, i)):
for i in papers days[k["date"]] = k
for j in parse_paper(year, i)):
days[k['date']] = k
return { return {
'year': year, "year": year,
'papers': papers, "papers": papers,
'days': sorted(days.values(), key=lambda x: x['date']) "days": sorted(days.values(), key=lambda x: x["date"]),
} }
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('year', type=int) parser.add_argument("year", type=int)
args = parser.parse_args() args = parser.parse_args()
year = args.year year = args.year
print(json.dumps(fetch_holiday(year), print(
indent=4, json.dumps(
ensure_ascii=False, fetch_holiday(year), indent=4, ensure_ascii=False, cls=CustomJSONEncoder
cls=CustomJSONEncoder)) )
)
class CustomJSONEncoder(json.JSONEncoder): class CustomJSONEncoder(json.JSONEncoder):
"""Custom json encoder. """ """Custom json encoder."""
def default(self, o): def default(self, o):
# pylint:disable=method-hidden # pylint:disable=method-hidden
@ -380,5 +376,5 @@ class CustomJSONEncoder(json.JSONEncoder):
return super().default(o) return super().default(o)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View File

@ -1,6 +1,7 @@
"""Tools for files. """ """Tools for files. """
import os import os
__dirname__ = os.path.abspath(os.path.dirname(__file__)) __dirname__ = os.path.abspath(os.path.dirname(__file__))

View File

@ -3,40 +3,54 @@ import json
import pytest import pytest
from fetch_holidays import (CustomJSONEncoder, DescriptionParser, get_paper, from fetch_holidays import (
get_paper_urls, get_rules) CustomJSONEncoder,
DescriptionParser,
get_paper,
get_paper_urls,
get_rules,
)
from .filetools import _file_path from .filetools import _file_path
def test_get_paper_urls(): def test_get_paper_urls():
assert get_paper_urls(2019) == [ assert get_paper_urls(2019) == [
'http://www.gov.cn/zhengce/content/2018-12/06/content_5346276.htm', "http://www.gov.cn/zhengce/content/2018-12/06/content_5346276.htm",
'http://www.gov.cn/zhengce/content/2019-03/22/content_5375877.htm', "http://www.gov.cn/zhengce/content/2019-03/22/content_5375877.htm",
] ]
def test_get_rules(): def test_get_rules():
assert ( assert list(
list(get_rules(get_paper( get_rules(
'http://www.gov.cn/zhengce/content/2019-03/22/content_5375877.htm'))) get_paper(
== [('劳动节', "http://www.gov.cn/zhengce/content/2019-03/22/content_5375877.htm"
'2019年5月1日至4日放假调休共4天。4月28日星期日、5月5日星期日上班。')]) )
)
) == [("劳动节", "2019年5月1日至4日放假调休共4天。4月28日星期日、5月5日星期日上班。")]
def _normalize(iterable): def _normalize(iterable):
return sorted(json.loads(json.dumps(list(iterable), cls=CustomJSONEncoder)), return sorted(
key=lambda x: x['date']) json.loads(json.dumps(list(iterable), cls=CustomJSONEncoder)),
key=lambda x: x["date"],
)
def _description_parsing_cases(): def _description_parsing_cases():
with open(_file_path('description_parsing_cases.json'), 'r', encoding='utf-8', ) as f: with open(
_file_path("description_parsing_cases.json"),
"r",
encoding="utf-8",
) as f:
return json.load(f) return json.load(f)
@pytest.mark.parametrize('case', _description_parsing_cases()) @pytest.mark.parametrize("case", _description_parsing_cases())
def test_parse_description(case): def test_parse_description(case):
year, description, expected = case['year'], case['description'], case['expected'] year, description, expected = case["year"], case["description"], case["expected"]
assert _normalize(DescriptionParser( assert _normalize(DescriptionParser(description, year).parse()) == _normalize(
description, year).parse()) == _normalize(expected), case expected
), case