Connecting to Databases
How to Access Databases from Streamlit Applications
Reliable database connections are essential for any data-driven application. This chapter explains how to securely and efficiently connect Streamlit applications to various databases.
Why Database Connections Matter
BI dashboards aggregate data from diverse sources, including corporate data warehouses, cloud databases, and vector databases. Understanding how to connect to each type is key to building flexible and powerful dashboards.
Streamlit can improve data access speed and stability by using connection pools and caching to manage database connections efficiently.
Core Concepts of Database Connections
What is a Connection Pool?
A connection pool is a cache of database connections maintained for reuse. This avoids the overhead of establishing a new connection for every request, which significantly improves application performance.
Feature | With Connection Pool | Without Connection Pool |
---|---|---|
Connection Time | Fast (reused) | Slow (created each time) |
Resource Usage | Efficient | High |
Stability | High | Prone to connection errors |
Security Best Practices
It is critical to handle credentials like passwords and API keys securely. Hardcoding them in source code is a major security risk.
Unsafe Example:
# ❌ Hardcoding a password in your code
password = "my_secret_password"
Secure Approach:
# ✅ Retrieving the password from an environment variable
import os
password = os.environ["DATABASE_PASSWORD"]
A Look at Major Databases
Different databases serve different purposes. This comparison helps in choosing the right one for your needs.
Database | Key Features | Common Use Cases | Scale |
---|---|---|---|
PostgreSQL | Open-source, feature-rich RDBMS | General web applications, transactional systems | Small to large |
Snowflake | Cloud-native Data Warehouse | Large-scale data analytics, BI | Large |
BigQuery | Google's serverless analytics DWH | Petabyte-scale analysis, large datasets | Very large |
Pinecone | Vector database for AI | AI/ML applications, similarity search | Specialized |
How to Connect to Major Databases
Connecting to PostgreSQL
PostgreSQL is a powerful, open-source relational database that is widely used and integrates smoothly with Streamlit.
Required Packages
sqlalchemy>=2.0
psycopg[binary]>=3.1
pandas>=2.2
dotenv>=1.0
Connection Code
import os
import streamlit as st
from sqlalchemy import create_engine, text
import pandas as pd
import dotenv
# Load environment variables from a .env file
dotenv.load_dotenv()
@st.cache_resource(show_spinner="⏳ Connecting to database...")
def get_engine():
"""Creates a SQLAlchemy engine with a connection pool."""
db_url = os.environ["DATABASE_URL"]
engine = create_engine(
db_url,
pool_size=5,
pool_pre_ping=True
)
return engine
def get_data(sql):
"""Executes a SQL query and returns a DataFrame."""
engine = get_engine()
with engine.connect() as conn:
df = pd.read_sql(text(sql), conn)
return df
# Example usage
df = get_data("SELECT * FROM users LIMIT 1")
st.header("PostgreSQL Data Sample")
st.table(df)
Environment Variable Setup
DATABASE_URL
: Your PostgreSQL connection string (e.g.,postgresql://user:password@localhost:5432/mydatabase
).
Key Points
- Use
@st.cache_resource
to cache the connection engine. - Set
pool_pre_ping=True
to ensure connections are live. - Store the connection string in the
DATABASE_URL
environment variable.
Connecting to Snowflake
Snowflake is a cloud-native data warehouse designed for high-performance analytics on large datasets.
Required Packages
snowflake-connector-python
dotenv>=1.0
Private Key Authentication
Snowflake recommends using private key authentication for enhanced security. For setup instructions, see the official Snowflake documentation.
Base64 Encoding Your Private Key:
macOS/Linux:
base64 -i ~/.ssh/snowflake_rsa_key.p8 | tr -d '\n'
Windows (PowerShell):
[Convert]::ToBase64String([System.IO.File]::ReadAllBytes("C:\path\to\snowflake_rsa_key.p8"))
Implementation:
# snowflake_connection.py
import base64
import os
from cryptography.hazmat.primitives import serialization
import snowflake.connector
import streamlit as st
import dotenv
dotenv.load_dotenv()
def get_snowflake_connection():
"""Establishes a connection to Snowflake using private key authentication."""
pem_bytes = base64.b64decode(os.environ["SNOWFLAKE_KEY_B64"])
private_key = serialization.load_pem_private_key(pem_bytes, password=None)
return snowflake.connector.connect(
user=os.environ["SNOWFLAKE_USER"],
account=os.environ["SNOWFLAKE_ACCOUNT"],
private_key=private_key,
warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
role=os.environ["SNOWFLAKE_ROLE"]
)
def execute_snowflake_query(sql):
"""Executes a query on Snowflake and returns a DataFrame."""
with get_snowflake_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(sql)
df = cursor.fetch_pandas_all()
return df
# Example usage
df = execute_snowflake_query('SELECT * FROM PUBLIC."users" LIMIT 1')
st.header("Snowflake Connection Test")
st.table(df)
Environment Variable Setup
SNOWFLAKE_USER
: Your Snowflake username.SNOWFLAKE_ACCOUNT
: Your account identifier.SNOWFLAKE_KEY_B64
: Your Base64-encoded private key.SNOWFLAKE_WAREHOUSE
: The warehouse to use.SNOWFLAKE_ROLE
: The role to use.
Connecting to BigQuery
BigQuery is Google Cloud's serverless data warehouse for analyzing petabyte-scale datasets.
Required Packages
google-cloud-bigquery[pandas]>=3.34.0
google-cloud-bigquery-storage>=2.32.0
dotenv>=1.0
Service Account Authentication
BigQuery uses service account JSON files for authentication. To create one, see the official Google Cloud documentation.
Base64 Encoding Your Service Account JSON:
macOS/Linux:
base64 -i service-account-key.json | tr -d '\n'
Windows (PowerShell):
[Convert]::ToBase64String([System.IO.File]::ReadAllBytes("C:\path\to\service-account-key.json"))
Implementation:
import base64
import json
import os
from google.oauth2 import service_account
from google.cloud import bigquery
def get_bigquery_client():
"""Creates a BigQuery client using a service account."""
credentials_json = base64.b64decode(
os.environ['SERVICE_ACCOUNT_JSON_BASE64']
).decode('utf-8')
credentials_info = json.loads(credentials_json)
credentials = service_account.Credentials.from_service_account_info(credentials_info)
return bigquery.Client(credentials=credentials)
def query_bigquery(sql):
"""Queries BigQuery and returns a DataFrame."""
client = get_bigquery_client()
return client.query(sql).to_dataframe()
Environment Variable Setup
SERVICE_ACCOUNT_JSON_BASE64
: Your Base64-encoded service account JSON.
Connecting to Pinecone (Vector Database)
Pinecone is a vector database for AI applications like similarity search.
Required Packages
pinecone>=7.0.0
openai>=1.92.2
dotenv>=1.0
Connection Method
import os
import openai
import streamlit as st
from pinecone import Pinecone
import dotenv
dotenv.load_dotenv()
@st.cache_resource(show_spinner="🔄 Connecting to Pinecone...")
def get_pinecone_index():
"""Initializes and returns a Pinecone index."""
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
index_name = os.environ.get("PINECONE_INDEX", "my-index")
if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=1536, # For text-embedding-ada-002
metric="cosine"
)
return pc.Index(index_name)
def search_vectors(query_text):
"""Searches for similar vectors in Pinecone."""
index = get_pinecone_index()
res = openai.embeddings.create(
input=query_text,
model="text-embedding-ada-002"
)
query_vector = res.data[0].embedding
results = index.query(
vector=[query_vector],
top_k=5,
include_metadata=True
)
return results
# Example usage
st.header("Pinecone Vector Search Example")
query = "example query"
result = search_vectors(query)
st.write(result)
Environment Variable Setup
PINECONE_API_KEY
: Your Pinecone API key.PINECONE_INDEX
: The name of the index.OPENAI_API_KEY
: Your OpenAI API key.
Tips for Writing Efficient Queries
Efficient queries are key to a responsive dashboard.
Basic Optimization Techniques
Technique | Description | Example |
---|---|---|
Use Indexes | Apply indexes to columns in WHERE clauses. | WHERE user_id = 123 |
Limit Results | Use LIMIT to restrict rows from large tables. | SELECT * FROM users LIMIT 1000 |
Select Specific Columns | Avoid SELECT * ; specify only needed columns. | SELECT name, email FROM users |
Filter Early | Apply WHERE conditions before JOIN operations. | Place WHERE clause before JOIN . |
Improving Performance
Push filtering to the database instead of processing in Python.
# ❌ Inefficient: Fetching all users, then filtering
def get_all_users_then_filter():
df = get_data("SELECT * FROM users")
return df[df['status'] == 'active']
# ✅ Efficient: Filtering in the database
def get_active_users():
return get_data("""
SELECT user_id, name, email, created_at
FROM users
WHERE status = 'active'
LIMIT 1000
""")
Handling Large Datasets
Process data in batches (pages) to avoid memory issues.
def get_data_in_batches(table_name, batch_size=10000):
"""Fetches data in batches to avoid out-of-memory errors."""
offset = 0
while True:
sql = f"SELECT id, name, created_at FROM {table_name} ORDER BY id LIMIT {batch_size} OFFSET {offset}"
batch_df = get_data(sql)
if batch_df.empty:
break
yield batch_df
offset += batch_size
Configuration and Troubleshooting
Managing Environment Variables
Store credentials as environment variables. For local development, set them in your shell:
export DATABASE_URL="postgresql://user:pass@localhost:5432/mydb"
export SNOWFLAKE_USER="myuser"
Common Errors and Solutions
Error | Cause | Solution |
---|---|---|
Connection Timeout | Network issues or slow server. | Adjust timeout settings; implement retries. |
Authentication Error | Incorrect credentials. | Verify environment variables. |
Pool Exhaustion | All connections are in use. | Increase pool_size ; ensure connections are closed. |
Query Error | SQL syntax error or permissions. | Check SQL syntax; grant database permissions. |
Implementing Error Handling
def safe_database_query(sql):
"""Executes a query with error handling."""
try:
return get_data(sql)
except Exception as e:
st.error(f"Database error: {e}")
return None
# Example
df = safe_database_query("SELECT * FROM users")
if df is not None:
st.dataframe(df)
else:
st.warning("Could not retrieve data.")
Performance Tuning
Caching Strategies
Use Streamlit's caching to reduce database load and accelerate your app.
# Cache the database connection object
@st.cache_resource
def get_connection():
return create_connection()
# Cache query results for 5 minutes
@st.cache_data(ttl=300)
def get_user_data():
return query_database("SELECT * FROM users")
Optimizing the Connection Pool
Fine-tune connection pool settings for better performance.
# Example for PostgreSQL
engine = create_engine(
database_url,
pool_size=10, # Number of connections to keep open
max_overflow=20, # Max additional connections
pool_timeout=30, # Seconds to wait before giving up
pool_recycle=3600 # Recycle connections after 1 hour
)
Optimizing Data Transfer
Fetch only aggregated data from the database.
def get_dashboard_data():
return get_data("""
SELECT
DATE(created_at) as date,
COUNT(*) as user_count,
AVG(score) as avg_score
FROM users
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY DATE(created_at)
ORDER BY date
""")
Summary
This chapter covered connecting Streamlit applications to major databases.
Key takeaways:
- Use Connection Pools: Manage connections efficiently with
@st.cache_resource
. - Choose the Right Database: Select a database that fits your project's needs.
- Write Efficient Queries: Retrieve only the data you need to keep your app fast.
- Handle Errors Gracefully: Implement robust error handling.
- Optimize Performance: Use caching to reduce latency and database load.
Next Steps
Now that you can connect to your data, the next chapter will demonstrate how to use AI to generate and edit code, enabling you to build more sophisticated BI dashboards.