wwu-577/src/data/scrape.py

425 lines
11 KiB
Python

from datetime import date, timedelta
import datetime
import requests
from pathlib import Path
import click
from tqdm import tqdm
from data.main import data_dir, connect
from lxml import etree
import pandas as pd
from urllib.parse import urlparse
from tld import get_tld
from tld.utils import update_tld_names
@click.command(name='scrape:load')
@click.option('--directory', type=Path, default=data_dir(), show_default=True)
@click.option('--database', type=Path, default=data_dir() / "stories.duckdb", show_default=True)
def load(directory, database):
stories = directory / "stories.csv"
related = directory / "related.csv"
db = connect()
db.sql(f"""
CREATE OR REPLACE TABLE stories AS
SELECT
*
FROM read_csv_auto('{stories}')
""")
db.sql(f"""
CREATE OR REPLACE TABLE related_stories AS
SELECT
*
FROM read_csv_auto('{related}')
""")
db.close()
@click.command(name='scrape:download')
@click.option('-o', 'output_dir', type=Path, default=data_dir() / "memeorandum", show_default=True)
def download(output_dir):
"""download every day from 01/10/2005 to today from memeorandum.com"""
day = timedelta(days=1)
cur = date(2005, 10, 1)
end = date.today()
dates = []
while cur <= end:
if not (output_dir / f"{cur.strftime('%y-%m-%d')}.html").exists():
dates.append(cur)
cur = cur + day
date_iter = tqdm(dates, postfix="test")
for i in date_iter:
date_iter.set_postfix_str(f"{i}")
save_as = output_dir / f"{i.strftime('%y-%m-%d')}.html"
if save_as.exists():
continue
url = f"https://www.memeorandum.com/{i.strftime('%y%m%d')}/h2000"
r = requests.get(url)
with open(save_as, 'w') as f:
f.write(r.text)
@click.command(name='scrape:parse')
@click.option('-d', '--directory', type=Path, default=data_dir() / "memeorandum", show_default=True)
@click.option('-o', '--output_dir', type=Path, default=data_dir(), show_default=True)
def parse(directory, output_dir):
"""parse the html files on disk into a structured csv format."""
update_tld_names()
directory = data_dir() / "memeorandum"
parser = etree.HTMLParser()
pages = [f for f in directory.glob("*.html")]
published = []
others = []
# page = pages[0]
page_iter = tqdm(pages, postfix="starting")
for page in page_iter:
page_iter.set_postfix_str(f"{page}")
date = datetime.datetime.strptime(page.stem, '%y-%m-%d')
tree = etree.parse(str(page), parser)
root = tree.getroot()
if root is None:
print(f"error opening {page}")
continue
items = root.xpath("//div[contains(@class, 'item')]")
# item = items[0]
for item in items:
out = dict()
out['published_at'] = date
citation = item.xpath('./cite')
if not citation:
continue
author = citation[0]
if author.text:
author = ''.join(author.text.split('/')[:-1]).strip()
else:
author = ''
out['author'] = author
try:
publisher_url = citation[0].getchildren()[0].get('href')
publisher = citation[0].getchildren()[0].text
except IndexError as e:
print(f"error with citation url: {page}")
out['publisher'] = publisher
out['publisher_url'] = publisher_url
title = item.xpath('.//strong/a')[0].text
out['title'] = title
url = item.xpath('.//strong/a')[0].get('href')
out['url'] = url
out['tld'] = get_tld(publisher_url)
item_id = hash((page.stem, url))
out['id'] = item_id
# old_id = hash((title, page.stem, publisher_url))
# out['old_id'] = old_id
published.append(out)
related = item.xpath(".//span[contains(@class, 'mls')]/a")
# relation = related[0]
for relation in related:
another = dict()
another['url'] = relation.get('href')
another['publisher'] = relation.text
another['parent_id'] = item_id
another['publisher_domain'] = urlparse(another['url']).netloc
others.append(another)
df = pd.DataFrame(published)
df.to_csv(output_dir / 'stories.csv', sep='|', index=False)
df = pd.DataFrame(others)
df.to_csv(output_dir / 'related.csv', sep='|', index=False)
@click.command(name='scrape:normalize')
def normalize():
"""fix database after load. remove duplicates. create publishers."""
DB = connect()
DB.sql("""
DELETE FROM stories
WHERE id IN (
WITH cte AS (
SELECT
url
,id
,ROW_NUMBER() OVER(PARTITION BY url) AS url_ctn
,ROW_NUMBER() OVER(PARTITION BY title) AS title_ctn
FROM stories
)
SELECT
id
FROM cte
WHERE url_ctn > 1
OR title_ctn > 1
)
""")
DB.sql("""
CREATE OR REPLACE TABLE publishers AS
with cte as (
SELECT
s.publisher as name
,s.publisher_url_domain as url
FROM stories s
GROUP BY
s.publisher
,s.publisher_url_domain
), together AS (
SELECT
COALESCE(cte.name, r.publisher) AS name
,COALESCE(cte.url, r.publisher_domain) as url
FROM cte
FULL OUTER JOIN related_stories r
ON cte.url = r.publisher_domain
)
SELECT
ROW_NUMBER() OVER() as id
,t.name
,t.url
FROM together t
where t.url is not null
GROUP BY
name
,url
""")
DB.sql("""
alter table stories
add column publisher_id bigint
""")
DB.sql("""
update stories
set publisher_id = publishers.id
from publishers
where publishers.url = stories.publisher_url_domain
""")
DB.sql("""
alter table stories alter publisher_id set data type bigint
""")
DB.sql("""
alter table stories drop publisher;
alter table stories drop publisher_url;
alter table stories drop publisher_url_domain;
alter table stories drop domain;
""")
DB.sql("""
alter table related_stories
add column publisher_id bigint
""")
DB.sql("""
update related_stories
set publisher_id = publishers.id
from publishers
where publishers.url = related_stories.publisher_domain
""")
DB.sql("""
alter table related_stories drop publisher;
alter table related_stories drop publisher_domain;
""")
def another_norm():
sv2 = pd.read_csv(data_dir / 'stories.csv', sep="|")
related = pd.read_csv(data_dir / 'related.csv', sep="|")
related['tld'] = related.url.apply(lambda x: map_tld(x))
DB.query("""
update related_stories
set publisher_id = p.id
from publishers p
join related r
on r.tld = p.tld
where r.url = related_stories.url
""")
DB.query("""alter table stories add column tld text""")
s_url = DB.query("""
select
id
,url
from stories
""").df()
s_url['tld'] = s_url.url.apply(lambda x: map_tld(x))
DB.query("""
update stories
set tld = s_url.tld
from s_url
where s_url.id = stories.id
""")
DB.query("""
update stories
set publisher_id = p.id
from publishers p
where p.tld = stories.tld
""")
select
DB.query("""
update stories
set stories.publisher_id = p.id
from new_pub
""")
sv2['tld'] = sv2.publisher_url.apply(lambda x: map_tld(x))
new_pub = DB.query("""
with cte as (
select
tld
,publisher
,count(1) filter(where year(published_at) = 2022) as recent_ctn
,count(1) as ctn
from sv2
group by
tld
,publisher
)
,r as (
select
tld
,publisher
,ctn
,row_number() over(partition by tld order by recent_ctn desc) as rn
from cte
)
select
row_number() over() as id
,publisher as name
,tld
from r
where rn = 1
order by ctn desc
""").df()
DB.query("""
CREATE OR REPLACE TABLE publishers AS
SELECT
id
,name
,tld
FROM new_pub
""")
DB.sql("""
SELECT
s.id
,sv2.publisher_url
FROM stories s
JOIN sv2
on sv2.id = s.id
limit 5
""")
@click.command('data:create-election-table')
def create_elections_table():
df = pd.read_csv(data_dir() / 'election_dates.csv', sep="|")
df['date'] = pd.to_datetime(df.date)
DB = connect()
DB.query("""
CREATE OR REPLACE TABLE election_dates AS
SELECT
row_number() over() as id
,type
,date
,winner
FROM df
""")
DB.query("""
CREATE OR REPLACE TABLE election_distance AS
WITH cte as (
SELECT
day(e.date - s.published_at) as days_away
,e.id as election_id
,e.date as election_date
,s.published_at as publish_date
,e.winner as winner
FROM (
SELECT
DISTINCT
published_at
FROM top.stories
) s
CROSS JOIN election_dates e
) , windowed as (
SELECT
row_number() over(partition by publish_date order by abs(days_away) asc) as rn
,days_away
,publish_date
,election_date
,election_id
,winner
FROM cte
)
SELECT
days_away
,publish_date
,election_date
,election_id
,winner
FROM windowed
WHERE rn = 1
""")
DB.close()
@click.command('scrape:create-denorm')
def create_denorm():
DB = connect()
DB.sql("create schema denorm")
DB.sql("""
CREATE OR REPLACE TABLE denorm.stories AS
SELECT
s.id as story_id
,s.title
,s.url
,s.published_at
,s.author
,p.name as publisher
,p.tld as tld
,sent.class_id as sentiment
,d.days_away as election_distance
,b.ordinal as bias
,pca.first as link_1
,pca.second as link_2
,e.emotion_id as emotion
FROM top.stories s
JOIN top.publishers p
ON p.id = s.publisher_id
JOIN top.story_sentiments sent
ON s.id = sent.story_id
JOIN election_distance d
ON d.election_date = s.published_at
JOIN publisher_bias pb
ON pb.publisher_id = p.id
JOIN bias_ratings b
ON b.id = pb.bias_id
JOIN top.publisher_pca_onehot pca
ON pca.publisher_id = p.id
JOIN story_emotions e
ON e.story_id = s.id
""")
DB.close()