Implement parsing (WIP)

This commit is contained in:
NateScarlet 2019-03-08 00:11:53 +08:00
parent 4bcf0aed87
commit e6b04e9b25
No known key found for this signature in database
GPG Key ID: 5C242793B070309C
2 changed files with 158 additions and 12 deletions

View File

@ -2,7 +2,10 @@
"""Fetch holidays from gov.cn """ """Fetch holidays from gov.cn """
import argparse import argparse
import json
import re import re
from datetime import date, timedelta
from typing import List
import bs4 import bs4
import requests import requests
@ -13,7 +16,16 @@ SEARCH_URL = ('http://sousuo.gov.cn/s.htm'
'&pcodeJiguan=%E5%9B%BD%E5%8A%9E%E5%8F%91%E6%98%8E%E7%94%B5') '&pcodeJiguan=%E5%9B%BD%E5%8A%9E%E5%8F%91%E6%98%8E%E7%94%B5')
def get_paper_urls(year): def get_paper_urls(year: int) -> List[str]:
"""Find year related paper urls.
Args:
year (int): eg. 2018
Returns:
List[str]: Urls
"""
url = SEARCH_URL.format(year=year) url = SEARCH_URL.format(year=year)
body = requests.get(url).text body = requests.get(url).text
ret = re.findall( ret = re.findall(
@ -45,24 +57,154 @@ def get_rules(paper: str):
yield match.groups() yield match.groups()
def parse_holiday_description(year, description): def _cast_int(value):
pass return int(value) if value else None
def parse_paper(url): class SentenceParser:
pass """Parser for rule sentence. """
def __init__(self, sentence, year):
self.sentence = sentence
self.year = year
def extract_dates(self, value) -> List[date]:
memory = set()
for method in self.date_extraction_methods:
for i in method(self, value):
if i not in memory:
memory.add(i)
yield i
def _extract_dates_1(self, value):
match = re.match(r'(?:(\d+)年)?(?:(\d+)月)(\d+)日', value)
if match:
groups = [_cast_int(i) for i in match.groups()]
assert len(groups) == 3, groups
yield date(year=groups[0] or self.year,
month=groups[1], day=groups[2])
def _extract_dates_2(self, value):
match = re.match(
r'(?:(\d+)年)?(?:(\d+)月)(\d+)日(?:至|-)(?:(\d+)年)?(?:(\d+)月)?(\d+)日', value)
if match:
groups = [_cast_int(i) for i in match.groups()]
assert len(groups) == 6, groups
start = date(year=groups[0] or self.year,
month=groups[1], day=groups[2])
end = date(year=groups[3] or self.year,
month=groups[4] or groups[1], day=groups[5])
for i in range((end - start).days + 1):
yield start + timedelta(days=i)
def _extract_dates_3(self, value):
match = re.match(
r'(?:(\d+)年)?(?:(\d+)月)(\d+)日(?:[^]+)?(?:、(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:[^]+)?)+', value)
if match:
groups = [_cast_int(i) for i in match.groups()]
assert not (len(groups) % 3), groups
year = self.year
month = None
day = None
for i in range(0, len(groups), 3):
year = groups[i] or year
month = groups[i+1] or month
day = groups[i+2]
assert year
assert month
assert day
yield date(year=year, month=month, day=day)
date_extraction_methods = [
_extract_dates_1,
_extract_dates_2,
_extract_dates_3
]
def parse(self):
date_memory = set()
for method in self.parsing_methods:
for i in method(self):
if i['date'] in date_memory:
continue
date_memory.add(i['date'])
yield i
def _parse_rest_1(self):
match = re.match('(.+)放假(调休)?$', self.sentence)
if match:
for i in self.extract_dates(match.group(1)):
yield {
'date': i,
'isOffDay': True
}
def _parse_work_1(self):
match = re.match('(.+)上班$', self.sentence)
if match:
for i in self.extract_dates(match.group(1)):
yield {
'date': i,
'isOffDay': False
}
def _parse_work_2(self):
match = re.match('(.+)公休日调至(.+)', self.sentence)
if match:
for i in self.extract_dates(match.group(1)):
yield {
'date': i,
'isOffDay': False
}
for i in self.extract_dates(match.group(2)):
yield {
'date': i,
'isOffDay': True
}
parsing_methods = [
_parse_rest_1,
_parse_work_1,
_parse_work_2,
]
def parse_holiday_description(description: str, year: int):
for i in re.split('|。', description):
for j in SentenceParser(i, year).parse():
yield j
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('year') parser.add_argument('year', type=int)
args = parser.parse_args() args = parser.parse_args()
year = args.year
papers = get_paper_urls(year)
papers = get_paper_urls(args.year) ret = []
for i in papers: for i in papers:
paper = get_paper(i) paper = get_paper(i)
[print(i) for i in get_rules(paper)] rules = get_rules(paper)
for name, description in rules:
ret.extend({
'name': name,
**j
} for j in parse_holiday_description(description, year))
print(json.dumps(ret, indent=4, ensure_ascii=False, cls=CustomJSONEncoder))
class CustomJSONEncoder(json.JSONEncoder):
"""Custom json encoder. """
def default(self, o):
# pylint:disable=method-hidden
if isinstance(o, date):
return o.isoformat()
return super().default(o)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -2,7 +2,11 @@
import json import json
import sys import sys
from fetch_holidays import parse_holiday_description from fetch_holidays import CustomJSONEncoder, parse_holiday_description
def _normalize(iterable):
return sorted(json.loads(json.dumps(list(iterable), cls=CustomJSONEncoder)), key=lambda x: x['date'])
def _generate_tests(): def _generate_tests():
@ -12,8 +16,8 @@ def _generate_tests():
def create_test(case): def create_test(case):
def _test(): def _test():
year, description, expected = case['year'], case['description'], case['expected'] year, description, expected = case['year'], case['description'], case['expected']
assert parse_holiday_description( assert _normalize(parse_holiday_description(
year, description) == expected, case description, year)) == _normalize(expected), case
return _test return _test
for index, case in enumerate(cases, 1): for index, case in enumerate(cases, 1):