-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathinsert_pdf_recordthresher.py
More file actions
64 lines (53 loc) · 2.07 KB
/
insert_pdf_recordthresher.py
File metadata and controls
64 lines (53 loc) · 2.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import json
import time
from datetime import datetime
from threading import Thread
from sqlalchemy import text
from recordthresher.pdf_record import PDFRecord
from app import oa_db_engine, db
CHUNK_SIZE = 1000
INSERTED = 0
stmnt = f'''
with queue as (
SELECT q.doi, parsed.authors, parsed.abstract, parsed.references FROM public.tmp_pdf_recordthresher_queue q JOIN public.pdf_parsed parsed ON q.doi = parsed.doi
WHERE in_progress is FALSE
LIMIT {CHUNK_SIZE} FOR UPDATE SKIP LOCKED
)
update public.tmp_pdf_recordthresher_queue update_rows SET in_progress = TRUE
FROM queue WHERE update_rows.doi = queue.doi
RETURNING queue.*;
'''
def insert_pdf_records_loop():
global INSERTED
with oa_db_engine.connect() as conn:
while True:
rows = conn.execute(stmnt).fetchall()
conn.connection.commit()
if not rows:
break
pdf_records = []
for row in rows:
doi, authors, abstract, references = row
pdf_records.append(PDFRecord(doi=doi,
authors=json.dumps(authors),
abstract=abstract,
citations=json.dumps(references)))
db.session.bulk_save_objects(pdf_records)
conn.execute(text(
'DELETE FROM public.tmp_pdf_recordthresher_queue WHERE doi IN :dois'),
dois=tuple(
[record.doi for record in pdf_records]))
db.session.commit()
conn.connection.commit()
INSERTED += CHUNK_SIZE
def print_stats():
start = datetime.now()
while True:
now = datetime.now()
hrs_elapsed = (now - start).total_seconds() / (60 * 60)
rate = round(INSERTED / hrs_elapsed, 2) if hrs_elapsed > 0 else 0
print(f'Inserted - {INSERTED} | Rate - {rate}/hr')
time.sleep(5)
if __name__ == '__main__':
Thread(target=print_stats, daemon=True).start()
insert_pdf_records_loop()