Saving Scraped Data to PostgreSQL
Once a crawl produces more than a handful of rows, a real database beats flat files, and this guide — part of Storing and Exporting Scraped Data — shows how to load scraped records into PostgreSQL efficiently with psycopg.
The fast, correct pattern is to batch rows and insert them with psycopg's execute_values, using an ON CONFLICT clause so re-scraping updates existing rows instead of erroring or duplicating. Batching turns thousands of individual round-trips into a few, an ON CONFLICT ... DO UPDATE upsert makes your loader idempotent, and a connection pool keeps concurrent workers from exhausting the server. For teams that prefer an ORM, SQLAlchemy offers the same upsert semantics with a higher-level API.
Why Batch Inserts and Upserts Matter
Inserting rows one at a time is the classic scraper bottleneck. Each INSERT is a network round-trip plus a transaction commit, so ten thousand rows become ten thousand round-trips — often slower than the crawl that produced them. Batching many rows into a single statement collapses that overhead. psycopg's execute_values builds one multi-row INSERT from a list of tuples, cutting round-trips by orders of magnitude and letting PostgreSQL plan the write once.
Idempotency is the second requirement. Scrapers re-run, retry, and overlap, so the same record will arrive more than once. Without protection, a unique constraint violation aborts the whole batch; without a unique constraint, you silently accumulate duplicates. PostgreSQL's ON CONFLICT clause solves both: declare a unique key, and on collision either skip the row (DO NOTHING) or update it (DO UPDATE). This upsert makes reloading the same data safe, which pairs naturally with the retry logic in Retrying Failed Requests with Tenacity.
Schema and Bulk Upsert with psycopg
Start with a table that has a natural unique key — the source URL or a product ID works well. The example below fetches a page, extracts a couple of fields, and bulk-upserts a batch. It uses execute_values with a RETURNING-free ON CONFLICT upsert so re-running the scraper refreshes title and scraped_at without creating duplicates.
import datetime as dt
import httpx
import psycopg
from psycopg.rows import dict_row
from psycopg.extras import execute_values # psycopg2-style helper
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0 Safari/537.36"}
DSN = "postgresql://scraper:secret@localhost:5432/scrapes"
DDL = """
CREATE TABLE IF NOT EXISTS products (
id BIGSERIAL PRIMARY KEY,
source_url TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
price_cents INTEGER,
scraped_at TIMESTAMPTZ NOT NULL
);
"""
UPSERT = """
INSERT INTO products (source_url, title, price_cents, scraped_at)
VALUES %s
ON CONFLICT (source_url) DO UPDATE SET
title = EXCLUDED.title,
price_cents = EXCLUDED.price_cents,
scraped_at = EXCLUDED.scraped_at;
"""
def scrape_rows(urls: list[str]) -> list[tuple[str, str, int, dt.datetime]]:
rows: list[tuple[str, str, int, dt.datetime]] = []
now = dt.datetime.now(dt.timezone.utc)
with httpx.Client(headers=HEADERS, timeout=15.0) as client:
for url in urls:
resp = client.get(url)
resp.raise_for_status()
title = resp.headers.get("x-title", "Example Product")
rows.append((url, title, 1999, now))
return rows
def save(rows: list[tuple[str, str, int, dt.datetime]]) -> None:
with psycopg.connect(DSN, row_factory=dict_row) as conn:
with conn.cursor() as cur:
cur.execute(DDL)
execute_values(cur, UPSERT, rows, page_size=500)
conn.commit()
if __name__ == "__main__":
data = scrape_rows(["https://httpbin.org/get?p=1", "https://httpbin.org/get?p=2"])
save(data)
print(f"upserted {len(data)} rows")
The EXCLUDED pseudo-table refers to the row that would have been inserted, so the DO UPDATE branch copies the fresh values over the existing row. page_size controls how many rows go in each underlying statement — 500 to 1000 is a good starting range.
Connection Pooling for Concurrent Workers
A crawl with many concurrent workers must not open a new database connection per task; connections are expensive and PostgreSQL caps them. psycopg_pool provides a ConnectionPool that hands out and recycles a bounded set of connections, so a distributed crawl — like the ones described in Scaling and Deploying Python Web Scrapers — shares a small pool instead of stampeding the server.
import datetime as dt
from psycopg_pool import ConnectionPool
from psycopg.extras import execute_values
DSN = "postgresql://scraper:secret@localhost:5432/scrapes"
pool = ConnectionPool(DSN, min_size=2, max_size=10)
UPSERT = """
INSERT INTO products (source_url, title, price_cents, scraped_at)
VALUES %s
ON CONFLICT (source_url) DO UPDATE SET
title = EXCLUDED.title,
price_cents = EXCLUDED.price_cents,
scraped_at = EXCLUDED.scraped_at;
"""
def save_batch(rows: list[tuple[str, str, int, dt.datetime]]) -> None:
with pool.connection() as conn:
with conn.cursor() as cur:
execute_values(cur, UPSERT, rows, page_size=500)
conn.commit()
if __name__ == "__main__":
now = dt.datetime.now(dt.timezone.utc)
batch = [("https://example.com/p/1", "Widget", 999, now)]
save_batch(batch)
print("saved via pool")
pool.close()
The SQLAlchemy Alternative
If you prefer an ORM or want database-agnostic code, SQLAlchemy exposes PostgreSQL's upsert through sqlalchemy.dialects.postgresql.insert(...).on_conflict_do_update(...). You trade a little raw throughput for typed models, migrations via Alembic, and easier joins when you later query the data. The choice mirrors the broader trade-off between hand-tuned drivers and higher-level frameworks discussed in Scrapy vs BeautifulSoup: Which to Use.
import datetime as dt
from sqlalchemy import create_engine, String, Integer, DateTime
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.dialects.postgresql import insert
engine = create_engine("postgresql+psycopg://scraper:secret@localhost:5432/scrapes")
class Base(DeclarativeBase):
pass
class Product(Base):
__tablename__ = "products_orm"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
source_url: Mapped[str] = mapped_column(String, unique=True)
title: Mapped[str] = mapped_column(String)
scraped_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True))
def upsert(rows: list[dict]) -> None:
Base.metadata.create_all(engine)
stmt = insert(Product).values(rows)
stmt = stmt.on_conflict_do_update(
index_elements=["source_url"],
set_={"title": stmt.excluded.title, "scraped_at": stmt.excluded.scraped_at},
)
with engine.begin() as conn:
conn.execute(stmt)
if __name__ == "__main__":
now = dt.datetime.now(dt.timezone.utc)
upsert([{"source_url": "https://example.com/p/9", "title": "Gadget", "scraped_at": now}])
print("orm upsert done")
Edge Cases and Caveats
- Wrap each batch in one transaction. Commit per batch, not per row. A single commit for 500 rows is dramatically faster and keeps the batch atomic so a mid-batch failure rolls back cleanly.
- Declare the conflict target explicitly.
ON CONFLICT (source_url)needs a matching unique constraint or unique index. Without one, PostgreSQL raises an error rather than upserting. - Mind
execute_valuespage_size. Too small loses the batching benefit; too large builds a giant statement that strains memory. Start around 500–1000 and measure. - Always parameterize values. Never build SQL by string-formatting scraped text — that invites SQL injection and quoting bugs. Pass values as tuples so the driver escapes them.
- Size the pool to the server.
max_sizeacross all workers must stay under PostgreSQL'smax_connections. For heavy fan-out, front the database with PgBouncer instead of a huge pool. - Store timezone-aware timestamps. Use
TIMESTAMPTZand UTC-aware datetimes so records scraped from different machines sort correctly.
Frequently Asked Questions
Why use execute_values instead of a loop of INSERT statements?
Each individual INSERT is a separate round-trip and commit, which becomes the bottleneck at scale. execute_values packs many rows into a single multi-row statement, cutting round-trips by orders of magnitude and letting PostgreSQL plan the write once. It is the simplest large win for a scraper's storage layer.
How does ON CONFLICT make my loader idempotent?ON CONFLICT (unique_key) DO UPDATE tells PostgreSQL that when an inserted row collides with an existing unique key, it should update the existing row with the new values instead of failing. That means re-running your scraper refreshes records rather than erroring or duplicating, so retries and overlapping crawls are safe.
Do I need a connection pool for a single-threaded scraper?
Not strictly — one long-lived connection is fine for a single worker. A pool becomes important once you run concurrent tasks, because opening a fresh connection per task is slow and can exhaust PostgreSQL's connection limit. psycopg_pool bounds and recycles connections for you.
Should I use psycopg directly or SQLAlchemy?
Use psycopg directly when you want maximum bulk-insert throughput and full control over SQL. Choose SQLAlchemy when you value typed models, migrations, and database-agnostic code and can accept a small overhead. Both support PostgreSQL upserts, so idempotency is available either way.