374 lines
10 KiB
Python
Executable File
374 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""Fetch holidays from gov.cn """
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
from datetime import date, timedelta
|
||
from itertools import chain
|
||
from typing import Iterator, List, Optional, Tuple
|
||
|
||
import bs4
|
||
import requests
|
||
|
||
SEARCH_URL = 'http://sousuo.gov.cn/s.htm'
|
||
PAPER_EXCLUDE = [
|
||
'http://www.gov.cn/zhengce/content/2014-09/29/content_9102.htm',
|
||
'http://www.gov.cn/zhengce/content/2015-02/09/content_9466.htm',
|
||
]
|
||
|
||
|
||
def get_paper_urls(year: int) -> List[str]:
|
||
"""Find year related paper urls.
|
||
|
||
Args:
|
||
year (int): eg. 2018
|
||
|
||
Returns:
|
||
List[str]: Urls, newlest first.
|
||
"""
|
||
|
||
body = requests.get(SEARCH_URL, params={
|
||
't': 'paper',
|
||
'advance': 'true',
|
||
'title': year,
|
||
'q': '假期',
|
||
'pcodeJiguan': '国办发明电',
|
||
'puborg': '国务院办公厅'
|
||
}).text
|
||
ret = re.findall(
|
||
r'<li class="res-list".*?<a href="(.+?)".*?</li>', body, flags=re.S)
|
||
ret = [i for i in ret if i not in PAPER_EXCLUDE]
|
||
ret.sort()
|
||
return ret
|
||
|
||
|
||
def get_paper(url: str) -> str:
|
||
"""Extract paper text from url.
|
||
|
||
Args:
|
||
url (str): Paper url.
|
||
|
||
Returns:
|
||
str: Extracted paper text.
|
||
"""
|
||
|
||
assert re.match(r'http://www.gov.cn/zhengce/content/\d{4}-\d{2}/\d{2}/content_\d+.htm',
|
||
url), 'Site changed, need human verify'
|
||
|
||
response = requests.get(url)
|
||
response.encoding = 'utf-8'
|
||
soup = bs4.BeautifulSoup(response.text, features='html.parser')
|
||
container = soup.find('td', class_='b12c')
|
||
assert container, f'Can not get paper container from url: {url}'
|
||
ret = container.get_text().replace('\u3000\u3000', '\n')
|
||
assert ret, f'Can not get paper context from url: {url}'
|
||
return ret
|
||
|
||
|
||
def get_rules(paper: str) -> Iterator[Tuple[str, str]]:
|
||
"""Extract rules from paper.
|
||
|
||
Args:
|
||
paper (str): Paper text
|
||
|
||
Raises:
|
||
NotImplementedError: When find no rules.
|
||
|
||
Returns:
|
||
Iterator[Tuple[str, str]]: (name, description)
|
||
"""
|
||
|
||
lines: list = paper.splitlines()
|
||
lines = sorted(set(lines), key=lines.index)
|
||
count = 0
|
||
for i in chain(get_normal_rules(lines), get_patch_rules(lines)):
|
||
count += 1
|
||
yield i
|
||
if not count:
|
||
raise NotImplementedError(lines)
|
||
|
||
|
||
def get_normal_rules(lines: Iterator[str]) -> Iterator[Tuple[str, str]]:
|
||
"""Get normal holiday rule for a year
|
||
|
||
Args:
|
||
lines (Iterator[str]): paper content
|
||
|
||
Returns:
|
||
Iterator[Tuple[str, str]]: (name, description)
|
||
"""
|
||
for i in lines:
|
||
match = re.match(r'[一二三四五六七八九十]、(.+?):(.+)', i)
|
||
if match:
|
||
yield match.groups()
|
||
|
||
|
||
def get_patch_rules(lines: Iterator[str]) -> Iterator[Tuple[str, str]]:
|
||
"""Get holiday patch rule for existed holiday
|
||
|
||
Args:
|
||
lines (Iterator[str]): paper content
|
||
|
||
Returns:
|
||
Iterator[Tuple[str, str]]: (name, description)
|
||
"""
|
||
name = None
|
||
for i in lines:
|
||
match = re.match(r'.*\d+年(.{2,})(?:假期|放假)安排.*', i)
|
||
if match:
|
||
name = match.group(1)
|
||
if not name:
|
||
continue
|
||
match = re.match(r'^[一二三四五六七八九十]、(.+)$', i)
|
||
if not match:
|
||
continue
|
||
description = match.group(1)
|
||
if re.match(r'.*\d+月\d+日.*', description):
|
||
yield name, description
|
||
|
||
|
||
def _cast_int(value):
|
||
return int(value) if value else None
|
||
|
||
|
||
class DescriptionParser:
|
||
"""Parser for holiday shift description. """
|
||
|
||
def __init__(self, description: str, year: int):
|
||
self.description = description
|
||
self.year = year
|
||
self.date_history = list()
|
||
|
||
def parse(self) -> Iterator[dict]:
|
||
"""Generator for description parsing result.
|
||
|
||
Args:
|
||
year (int): Context year
|
||
"""
|
||
|
||
del self.date_history[:]
|
||
for i in re.split('[,。;]', self.description):
|
||
for j in SentenceParser(self, i).parse():
|
||
yield j
|
||
|
||
if not self.date_history:
|
||
raise NotImplementedError(self.description)
|
||
|
||
def get_date(self, year: Optional[int], month: Optional[int], day: int) -> date:
|
||
"""Get date in context.
|
||
|
||
Args:
|
||
year (Optional[int]): year
|
||
month (int): month
|
||
day (int): day
|
||
|
||
Returns:
|
||
date: Date result
|
||
"""
|
||
|
||
assert day, 'No day specified'
|
||
|
||
# Special case: month inherit
|
||
if month is None:
|
||
month = self.date_history[-1].month
|
||
|
||
# Special case: 12 month may mean previous year
|
||
if (year is None
|
||
and month == 12
|
||
and self.date_history
|
||
and max(self.date_history) < date(year=self.year, month=2, day=1)):
|
||
year = self.year - 1
|
||
|
||
year = year or self.year
|
||
return date(year=year, month=month, day=day)
|
||
|
||
|
||
class SentenceParser:
|
||
"""Parser for holiday shift description sentence. """
|
||
|
||
def __init__(self, parent: DescriptionParser, sentence):
|
||
self.parent = parent
|
||
self.sentence = sentence
|
||
|
||
def extract_dates(self, text: str) -> Iterator[date]:
|
||
"""Extract date from text.
|
||
|
||
Args:
|
||
text (str): Text to extract
|
||
|
||
Returns:
|
||
Iterator[date]: Extracted dates.
|
||
"""
|
||
|
||
count = 0
|
||
text = text.replace('(', '(').replace(')', ')')
|
||
for i in chain(*(method(self, text) for method in self.date_extraction_methods)):
|
||
count += 1
|
||
is_seen = i in self.parent.date_history
|
||
self.parent.date_history.append(i)
|
||
if is_seen:
|
||
continue
|
||
yield i
|
||
|
||
if not count:
|
||
raise NotImplementedError(text)
|
||
|
||
def _extract_dates_1(self, value: str) -> Iterator[date]:
|
||
match = re.findall(r'(?:(\d+)年)?(?:(\d+)月)?(\d+)日', value)
|
||
for groups in match:
|
||
groups = [_cast_int(i) for i in groups]
|
||
assert len(groups) == 3, groups
|
||
yield self.parent.get_date(year=groups[0], month=groups[1], day=groups[2])
|
||
|
||
def _extract_dates_2(self, value: str) -> Iterator[date]:
|
||
match = re.findall(
|
||
r'(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:至|-|—)(?:(\d+)年)?(?:(\d+)月)?(\d+)日', value)
|
||
for groups in match:
|
||
groups = [_cast_int(i) for i in groups]
|
||
assert len(groups) == 6, groups
|
||
start = self.parent.get_date(year=groups[0],
|
||
month=groups[1], day=groups[2])
|
||
end = self.parent.get_date(year=groups[3],
|
||
month=groups[4], day=groups[5])
|
||
for i in range((end - start).days + 1):
|
||
yield start + timedelta(days=i)
|
||
|
||
def _extract_dates_3(self, value: str) -> Iterator[date]:
|
||
match = re.findall(
|
||
r'(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:([^)]+))?'
|
||
r'(?:、(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:([^)]+))?)+',
|
||
value)
|
||
for groups in match:
|
||
groups = [_cast_int(i) for i in groups]
|
||
assert not (len(groups) % 3), groups
|
||
for i in range(0, len(groups), 3):
|
||
yield self.parent.get_date(year=groups[i], month=groups[i+1], day=groups[i+2])
|
||
|
||
date_extraction_methods = [
|
||
_extract_dates_1,
|
||
_extract_dates_2,
|
||
_extract_dates_3
|
||
]
|
||
|
||
def parse(self) -> Iterator[dict]:
|
||
"""Parse days with memory
|
||
|
||
Args:
|
||
memory (set): Date memory
|
||
|
||
Returns:
|
||
Iterator[dict]: Days without name field.
|
||
"""
|
||
|
||
for method in self.parsing_methods:
|
||
for i in method(self):
|
||
yield i
|
||
|
||
def _parse_rest_1(self):
|
||
match = re.match(r'(.+)(放假|补休|调休|公休)+(?:\d+天)?$', self.sentence)
|
||
if match:
|
||
for i in self.extract_dates(match.group(1)):
|
||
yield {
|
||
'date': i,
|
||
'isOffDay': True
|
||
}
|
||
|
||
def _parse_work_1(self):
|
||
match = re.match('(.+)上班$', self.sentence)
|
||
if match:
|
||
for i in self.extract_dates(match.group(1)):
|
||
yield {
|
||
'date': i,
|
||
'isOffDay': False
|
||
}
|
||
|
||
def _parse_shift_1(self):
|
||
match = re.match('(.+)调至(.+)', self.sentence)
|
||
if match:
|
||
for i in self.extract_dates(match.group(1)):
|
||
yield {
|
||
'date': i,
|
||
'isOffDay': False
|
||
}
|
||
for i in self.extract_dates(match.group(2)):
|
||
yield {
|
||
'date': i,
|
||
'isOffDay': True
|
||
}
|
||
|
||
parsing_methods = [
|
||
_parse_rest_1,
|
||
_parse_work_1,
|
||
_parse_shift_1,
|
||
]
|
||
|
||
|
||
def parse_paper(year: int, url: str) -> Iterator[dict]:
|
||
"""Parse one paper
|
||
|
||
Args:
|
||
year (int): Year
|
||
url (str): Paper url
|
||
|
||
Returns:
|
||
Iterator[dict]: Days
|
||
"""
|
||
paper = get_paper(url)
|
||
rules = get_rules(paper)
|
||
ret = ({'name': name, **i}
|
||
for name, description in rules
|
||
for i in DescriptionParser(description, year).parse())
|
||
try:
|
||
for i in ret:
|
||
yield i
|
||
except NotImplementedError as ex:
|
||
raise RuntimeError('Can not parse paper', url) from ex
|
||
|
||
|
||
def fetch_holiday(year: int):
|
||
"""Fetch holiday data. """
|
||
|
||
papers = get_paper_urls(year)
|
||
papers.reverse()
|
||
|
||
days = dict()
|
||
|
||
for k in (j
|
||
for i in papers
|
||
for j in parse_paper(year, i)):
|
||
days[k['date']] = k
|
||
|
||
return {
|
||
'year': year,
|
||
'papers': papers,
|
||
'days': sorted(days.values(), key=lambda x: x['date'])
|
||
}
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument('year', type=int)
|
||
args = parser.parse_args()
|
||
year = args.year
|
||
|
||
print(json.dumps(fetch_holiday(year),
|
||
indent=4,
|
||
ensure_ascii=False,
|
||
cls=CustomJSONEncoder))
|
||
|
||
|
||
class CustomJSONEncoder(json.JSONEncoder):
|
||
"""Custom json encoder. """
|
||
|
||
def default(self, o):
|
||
# pylint:disable=method-hidden
|
||
if isinstance(o, date):
|
||
return o.isoformat()
|
||
|
||
return super().default(o)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|