From b7e7ea81d18d125faca609aa04064297014a6930 Mon Sep 17 00:00:00 2001 From: jlevente Date: Thu, 30 May 2019 10:37:27 -0400 Subject: [PATCH 1/6] speed up import with pyscopg2.extras.execute_batch --- changesetmd.py | 60 +++++++++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/changesetmd.py b/changesetmd.py index 18d2e25..da74479 100755 --- a/changesetmd.py +++ b/changesetmd.py @@ -50,23 +50,28 @@ def createTables(self, connection): cursor.execute(queries.createGeometryColumn) connection.commit() - def insertNew(self, connection, id, userId, createdAt, minLat, maxLat, minLon, maxLon, closedAt, open, numChanges, userName, tags, comments): + def insertNewBatch(self, connection, data_arr): cursor = connection.cursor() if self.createGeometry: - cursor.execute('''INSERT into osm_changeset + sql = '''INSERT into osm_changeset (id, user_id, created_at, min_lat, max_lat, min_lon, max_lon, closed_at, open, num_changes, user_name, tags, geom) - values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,ST_SetSRID(ST_MakeEnvelope(%s,%s,%s,%s), 4326))''', - (id, userId, createdAt, minLat, maxLat, minLon, maxLon, closedAt, open, numChanges, userName, tags, minLon, minLat, maxLon, maxLat)) + values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,ST_SetSRID(ST_MakeEnvelope(%s,%s,%s,%s), 4326))''' + pyscopg2.extras.execute_batch(cursor, sql, data_arr) + cursor.close() else: - cursor.execute('''INSERT into osm_changeset + sql = '''INSERT into osm_changeset (id, user_id, created_at, min_lat, max_lat, min_lon, max_lon, closed_at, open, num_changes, user_name, tags) - values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''', - (id, userId, createdAt, minLat, maxLat, minLon, maxLon, closedAt, open, numChanges, userName, tags)) - for comment in comments: - cursor.execute('''INSERT into osm_changeset_comment + values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' + psycopg2.extras.execute_batch(cursor, sql, data_arr) + cursor.close() + + def insertNewBatchComment(self, connection, comment_arr): + cursor=connection.cursor() + sql = '''INSERT into osm_changeset_comment (comment_changeset_id, comment_user_id, comment_user_name, comment_date, comment_text) - values (%s,%s,%s,%s,%s)''', - (id, comment['uid'], comment['user'], comment['date'], comment['text'])) + values (%s,%s,%s,%s,%s)''' + psycopg2.extras.execute_batch(cursor, sql, comment_arr) + cursor.close() def deleteExisting(self, connection, id): cursor = connection.cursor() @@ -81,6 +86,8 @@ def parseFile(self, connection, changesetFile, doReplication): cursor = connection.cursor() context = etree.iterparse(changesetFile) action, root = context.next() + changesets = [] + comments = [] for action, elem in context: if(elem.tag != 'changeset'): continue @@ -91,28 +98,32 @@ def parseFile(self, connection, changesetFile, doReplication): for tag in elem.iterchildren(tag='tag'): tags[tag.attrib['k']] = tag.attrib['v'] - comments = [] for discussion in elem.iterchildren(tag='discussion'): for commentElement in discussion.iterchildren(tag='comment'): - comment = dict() - comment['uid'] = commentElement.attrib.get('uid') - comment['user'] = commentElement.attrib.get('user') - comment['date'] = commentElement.attrib.get('date') for text in commentElement.iterchildren(tag='text'): - comment['text'] = text.text + text = text.text + comment = (elem.attrib['id'], ommentElement.attrib.get('uid'), commentElement.attrib.get('user'), commentElement.attrib.get('date'), text) comments.append(comment) if(doReplication): self.deleteExisting(connection, elem.attrib['id']) - self.insertNew(connection, elem.attrib['id'], elem.attrib.get('uid', None), - elem.attrib['created_at'], elem.attrib.get('min_lat', None), - elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None), - elem.attrib.get('max_lon', None),elem.attrib.get('closed_at', None), - elem.attrib.get('open', None), elem.attrib.get('num_changes', None), - elem.attrib.get('user', None), tags, comments) + if self.createGeometry: + id, user_id, created_at, min_lat, max_lat, min_lon, max_lon, closed_at, open, num_changes, user_name, tags, geom + changesets.append((elem.attrib['id'], elem.attrib.get('uid', None), elem.attrib['created_at'], elem.attrib.get('min_lat', None), + elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None), elem.attrib.get('max_lon', None), elem.attrib.get('closed_at', None), + elem.attrib.get('open', None), elem.attrib.get('num_changes', None), elem.attrib.get('user', None), tags,elem.attrib.get('min_lon', None), elem.attrib.get('min_lat', None), + elem.attrib.get('max_lon', None), elem.attrib.get('max_lat', None))) + else: + changesets.append((elem.attrib['id'], elem.attrib.get('uid', None), elem.attrib['created_at'], elem.attrib.get('min_lat', None), + elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None), elem.attrib.get('max_lon', None), elem.attrib.get('closed_at', None), + elem.attrib.get('open', None), elem.attrib.get('num_changes', None), elem.attrib.get('user', None), tags)) if((parsedCount % 10000) == 0): + self.insertNewBatch(connection, changesets) + self.insertNewBatchComment(connection, comments ) + changesets = [] + comments = [] print "parsed %s" % ('{:,}'.format(parsedCount)) print "cumulative rate: %s/sec" % '{:,.0f}'.format(parsedCount/timedelta.total_seconds(datetime.now() - startTime)) @@ -120,6 +131,9 @@ def parseFile(self, connection, changesetFile, doReplication): elem.clear() while elem.getprevious() is not None: del elem.getparent()[0] + # Update whatever is left, then commit + self.isertNewBatch(connection, changesets) + self.insertNewBatchComment(connection, comments) connection.commit() print "parsing complete" print "parsed {:,}".format(parsedCount) From 864024a609fd00b328d35388246da0db5240429c Mon Sep 17 00:00:00 2001 From: jlevente Date: Thu, 30 May 2019 10:38:17 -0400 Subject: [PATCH 2/6] print status every 100k --- changesetmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changesetmd.py b/changesetmd.py index da74479..0167a68 100755 --- a/changesetmd.py +++ b/changesetmd.py @@ -119,7 +119,7 @@ def parseFile(self, connection, changesetFile, doReplication): elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None), elem.attrib.get('max_lon', None), elem.attrib.get('closed_at', None), elem.attrib.get('open', None), elem.attrib.get('num_changes', None), elem.attrib.get('user', None), tags)) - if((parsedCount % 10000) == 0): + if((parsedCount % 100000) == 0): self.insertNewBatch(connection, changesets) self.insertNewBatchComment(connection, comments ) changesets = [] From 72c1de194a1d69f719302e6f4b93973fed3bbe91 Mon Sep 17 00:00:00 2001 From: Gitea Date: Wed, 31 Jul 2019 18:41:15 -0400 Subject: [PATCH 3/6] fix type --- changesetmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changesetmd.py b/changesetmd.py index 0167a68..2ac6d9b 100755 --- a/changesetmd.py +++ b/changesetmd.py @@ -102,7 +102,7 @@ def parseFile(self, connection, changesetFile, doReplication): for commentElement in discussion.iterchildren(tag='comment'): for text in commentElement.iterchildren(tag='text'): text = text.text - comment = (elem.attrib['id'], ommentElement.attrib.get('uid'), commentElement.attrib.get('user'), commentElement.attrib.get('date'), text) + comment = (elem.attrib['id'], commentElement.attrib.get('uid'), commentElement.attrib.get('user'), commentElement.attrib.get('date'), text) comments.append(comment) if(doReplication): From 42fb06d1fa5c72ec0a7794c339587682e945775b Mon Sep 17 00:00:00 2001 From: Gitea Date: Wed, 31 Jul 2019 18:51:29 -0400 Subject: [PATCH 4/6] delete extra line --- changesetmd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/changesetmd.py b/changesetmd.py index 2ac6d9b..bf02e5d 100755 --- a/changesetmd.py +++ b/changesetmd.py @@ -109,7 +109,6 @@ def parseFile(self, connection, changesetFile, doReplication): self.deleteExisting(connection, elem.attrib['id']) if self.createGeometry: - id, user_id, created_at, min_lat, max_lat, min_lon, max_lon, closed_at, open, num_changes, user_name, tags, geom changesets.append((elem.attrib['id'], elem.attrib.get('uid', None), elem.attrib['created_at'], elem.attrib.get('min_lat', None), elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None), elem.attrib.get('max_lon', None), elem.attrib.get('closed_at', None), elem.attrib.get('open', None), elem.attrib.get('num_changes', None), elem.attrib.get('user', None), tags,elem.attrib.get('min_lon', None), elem.attrib.get('min_lat', None), From 3a25b95e0ce48654b014919c0bb4ef4d63dfb144 Mon Sep 17 00:00:00 2001 From: Gitea Date: Wed, 31 Jul 2019 19:01:53 -0400 Subject: [PATCH 5/6] fixed typp --- changesetmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changesetmd.py b/changesetmd.py index bf02e5d..5e1f004 100755 --- a/changesetmd.py +++ b/changesetmd.py @@ -56,7 +56,7 @@ def insertNewBatch(self, connection, data_arr): sql = '''INSERT into osm_changeset (id, user_id, created_at, min_lat, max_lat, min_lon, max_lon, closed_at, open, num_changes, user_name, tags, geom) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,ST_SetSRID(ST_MakeEnvelope(%s,%s,%s,%s), 4326))''' - pyscopg2.extras.execute_batch(cursor, sql, data_arr) + psycopg2.extras.execute_batch(cursor, sql, data_arr) cursor.close() else: sql = '''INSERT into osm_changeset From 38054a56b716be3648db3ca8aca8e644652893ae Mon Sep 17 00:00:00 2001 From: Gitea Date: Thu, 1 Aug 2019 15:26:42 -0400 Subject: [PATCH 6/6] fixed typo --- changesetmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changesetmd.py b/changesetmd.py index 5e1f004..0b22453 100755 --- a/changesetmd.py +++ b/changesetmd.py @@ -131,7 +131,7 @@ def parseFile(self, connection, changesetFile, doReplication): while elem.getprevious() is not None: del elem.getparent()[0] # Update whatever is left, then commit - self.isertNewBatch(connection, changesets) + self.insertNewBatch(connection, changesets) self.insertNewBatchComment(connection, comments) connection.commit() print "parsing complete"