Squadbase

Connecting to Databases

How to Access Databases from Streamlit Applications

Reliable database connections are essential for any data-driven application. This chapter explains how to securely and efficiently connect Streamlit applications to various databases.

Why Database Connections Matter

BI dashboards aggregate data from diverse sources, including corporate data warehouses, cloud databases, and vector databases. Understanding how to connect to each type is key to building flexible and powerful dashboards.

Overview of Database Connections

Streamlit can improve data access speed and stability by using connection pools and caching to manage database connections efficiently.

Core Concepts of Database Connections

What is a Connection Pool?

A connection pool is a cache of database connections maintained for reuse. This avoids the overhead of establishing a new connection for every request, which significantly improves application performance.

FeatureWith Connection PoolWithout Connection Pool
Connection TimeFast (reused)Slow (created each time)
Resource UsageEfficientHigh
StabilityHighProne to connection errors

Security Best Practices

It is critical to handle credentials like passwords and API keys securely. Hardcoding them in source code is a major security risk.

Unsafe Example:

# ❌ Hardcoding a password in your code
password = "my_secret_password"

Secure Approach:

# ✅ Retrieving the password from an environment variable
import os
password = os.environ["DATABASE_PASSWORD"]

A Look at Major Databases

Different databases serve different purposes. This comparison helps in choosing the right one for your needs.

DatabaseKey FeaturesCommon Use CasesScale
PostgreSQLOpen-source, feature-rich RDBMSGeneral web applications, transactional systemsSmall to large
SnowflakeCloud-native Data WarehouseLarge-scale data analytics, BILarge
BigQueryGoogle's serverless analytics DWHPetabyte-scale analysis, large datasetsVery large
PineconeVector database for AIAI/ML applications, similarity searchSpecialized

How to Connect to Major Databases

Connecting to PostgreSQL

PostgreSQL is a powerful, open-source relational database that is widely used and integrates smoothly with Streamlit.

Required Packages

sqlalchemy>=2.0
psycopg[binary]>=3.1
pandas>=2.2
dotenv>=1.0

Connection Code

import os
import streamlit as st
from sqlalchemy import create_engine, text
import pandas as pd
import dotenv

# Load environment variables from a .env file
dotenv.load_dotenv()

@st.cache_resource(show_spinner="⏳ Connecting to database...")
def get_engine():
    """Creates a SQLAlchemy engine with a connection pool."""
    db_url = os.environ["DATABASE_URL"]
    engine = create_engine(
        db_url,
        pool_size=5,
        pool_pre_ping=True
    )
    return engine

def get_data(sql):
    """Executes a SQL query and returns a DataFrame."""
    engine = get_engine()
    with engine.connect() as conn:
        df = pd.read_sql(text(sql), conn)
    return df

# Example usage
df = get_data("SELECT * FROM users LIMIT 1")
st.header("PostgreSQL Data Sample")
st.table(df)

Environment Variable Setup

  • DATABASE_URL: Your PostgreSQL connection string (e.g., postgresql://user:password@localhost:5432/mydatabase).

Key Points

  • Use @st.cache_resource to cache the connection engine.
  • Set pool_pre_ping=True to ensure connections are live.
  • Store the connection string in the DATABASE_URL environment variable.

Connecting to Snowflake

Snowflake is a cloud-native data warehouse designed for high-performance analytics on large datasets.

Required Packages

snowflake-connector-python
dotenv>=1.0

Private Key Authentication

Snowflake recommends using private key authentication for enhanced security. For setup instructions, see the official Snowflake documentation.

Base64 Encoding Your Private Key:

macOS/Linux:

base64 -i ~/.ssh/snowflake_rsa_key.p8 | tr -d '\n'

Windows (PowerShell):

[Convert]::ToBase64String([System.IO.File]::ReadAllBytes("C:\path\to\snowflake_rsa_key.p8"))

Implementation:

# snowflake_connection.py
import base64
import os
from cryptography.hazmat.primitives import serialization
import snowflake.connector
import streamlit as st
import dotenv

dotenv.load_dotenv()

def get_snowflake_connection():
    """Establishes a connection to Snowflake using private key authentication."""
    pem_bytes = base64.b64decode(os.environ["SNOWFLAKE_KEY_B64"])
    private_key = serialization.load_pem_private_key(pem_bytes, password=None)

    return snowflake.connector.connect(
        user=os.environ["SNOWFLAKE_USER"],
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        private_key=private_key,
        warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
        role=os.environ["SNOWFLAKE_ROLE"]
    )

def execute_snowflake_query(sql):
    """Executes a query on Snowflake and returns a DataFrame."""
    with get_snowflake_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql)
            df = cursor.fetch_pandas_all()
    return df

# Example usage
df = execute_snowflake_query('SELECT * FROM PUBLIC."users" LIMIT 1')
st.header("Snowflake Connection Test")
st.table(df)

Environment Variable Setup

  • SNOWFLAKE_USER: Your Snowflake username.
  • SNOWFLAKE_ACCOUNT: Your account identifier.
  • SNOWFLAKE_KEY_B64: Your Base64-encoded private key.
  • SNOWFLAKE_WAREHOUSE: The warehouse to use.
  • SNOWFLAKE_ROLE: The role to use.

Connecting to BigQuery

BigQuery is Google Cloud's serverless data warehouse for analyzing petabyte-scale datasets.

Required Packages

google-cloud-bigquery[pandas]>=3.34.0
google-cloud-bigquery-storage>=2.32.0
dotenv>=1.0

Service Account Authentication

BigQuery uses service account JSON files for authentication. To create one, see the official Google Cloud documentation.

Base64 Encoding Your Service Account JSON:

macOS/Linux:

base64 -i service-account-key.json | tr -d '\n'

Windows (PowerShell):

[Convert]::ToBase64String([System.IO.File]::ReadAllBytes("C:\path\to\service-account-key.json"))

Implementation:

import base64
import json
import os
from google.oauth2 import service_account
from google.cloud import bigquery

def get_bigquery_client():
    """Creates a BigQuery client using a service account."""
    credentials_json = base64.b64decode(
        os.environ['SERVICE_ACCOUNT_JSON_BASE64']
    ).decode('utf-8')
    credentials_info = json.loads(credentials_json)
    credentials = service_account.Credentials.from_service_account_info(credentials_info)
    return bigquery.Client(credentials=credentials)

def query_bigquery(sql):
    """Queries BigQuery and returns a DataFrame."""
    client = get_bigquery_client()
    return client.query(sql).to_dataframe()

Environment Variable Setup

  • SERVICE_ACCOUNT_JSON_BASE64: Your Base64-encoded service account JSON.

Connecting to Pinecone (Vector Database)

Pinecone is a vector database for AI applications like similarity search.

Required Packages

pinecone>=7.0.0
openai>=1.92.2
dotenv>=1.0

Connection Method

import os
import openai
import streamlit as st
from pinecone import Pinecone
import dotenv

dotenv.load_dotenv()

@st.cache_resource(show_spinner="🔄 Connecting to Pinecone...")
def get_pinecone_index():
    """Initializes and returns a Pinecone index."""
    pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
    index_name = os.environ.get("PINECONE_INDEX", "my-index")

    if index_name not in pc.list_indexes().names():
        pc.create_index(
            name=index_name,
            dimension=1536,  # For text-embedding-ada-002
            metric="cosine"
        )
    return pc.Index(index_name)

def search_vectors(query_text):
    """Searches for similar vectors in Pinecone."""
    index = get_pinecone_index()
    
    res = openai.embeddings.create(
        input=query_text,
        model="text-embedding-ada-002"
    )
    query_vector = res.data[0].embedding
    
    results = index.query(
        vector=[query_vector],
        top_k=5,
        include_metadata=True
    )
    return results

# Example usage
st.header("Pinecone Vector Search Example")
query = "example query"
result = search_vectors(query)
st.write(result)

Environment Variable Setup

  • PINECONE_API_KEY: Your Pinecone API key.
  • PINECONE_INDEX: The name of the index.
  • OPENAI_API_KEY: Your OpenAI API key.

Tips for Writing Efficient Queries

Efficient queries are key to a responsive dashboard.

Basic Optimization Techniques

TechniqueDescriptionExample
Use IndexesApply indexes to columns in WHERE clauses.WHERE user_id = 123
Limit ResultsUse LIMIT to restrict rows from large tables.SELECT * FROM users LIMIT 1000
Select Specific ColumnsAvoid SELECT *; specify only needed columns.SELECT name, email FROM users
Filter EarlyApply WHERE conditions before JOIN operations.Place WHERE clause before JOIN.

Improving Performance

Push filtering to the database instead of processing in Python.

# ❌ Inefficient: Fetching all users, then filtering
def get_all_users_then_filter():
    df = get_data("SELECT * FROM users")
    return df[df['status'] == 'active']

# ✅ Efficient: Filtering in the database
def get_active_users():
    return get_data("""
        SELECT user_id, name, email, created_at
        FROM users
        WHERE status = 'active'
        LIMIT 1000
    """)

Handling Large Datasets

Process data in batches (pages) to avoid memory issues.

def get_data_in_batches(table_name, batch_size=10000):
    """Fetches data in batches to avoid out-of-memory errors."""
    offset = 0
    while True:
        sql = f"SELECT id, name, created_at FROM {table_name} ORDER BY id LIMIT {batch_size} OFFSET {offset}"
        batch_df = get_data(sql)
        if batch_df.empty:
            break
        yield batch_df
        offset += batch_size

Configuration and Troubleshooting

Managing Environment Variables

Store credentials as environment variables. For local development, set them in your shell:

export DATABASE_URL="postgresql://user:pass@localhost:5432/mydb"
export SNOWFLAKE_USER="myuser"

Common Errors and Solutions

ErrorCauseSolution
Connection TimeoutNetwork issues or slow server.Adjust timeout settings; implement retries.
Authentication ErrorIncorrect credentials.Verify environment variables.
Pool ExhaustionAll connections are in use.Increase pool_size; ensure connections are closed.
Query ErrorSQL syntax error or permissions.Check SQL syntax; grant database permissions.

Implementing Error Handling

def safe_database_query(sql):
    """Executes a query with error handling."""
    try:
        return get_data(sql)
    except Exception as e:
        st.error(f"Database error: {e}")
        return None

# Example
df = safe_database_query("SELECT * FROM users")
if df is not None:
    st.dataframe(df)
else:
    st.warning("Could not retrieve data.")

Performance Tuning

Caching Strategies

Use Streamlit's caching to reduce database load and accelerate your app.

# Cache the database connection object
@st.cache_resource
def get_connection():
    return create_connection()

# Cache query results for 5 minutes
@st.cache_data(ttl=300)
def get_user_data():
    return query_database("SELECT * FROM users")

Optimizing the Connection Pool

Fine-tune connection pool settings for better performance.

# Example for PostgreSQL
engine = create_engine(
    database_url,
    pool_size=10,        # Number of connections to keep open
    max_overflow=20,     # Max additional connections
    pool_timeout=30,     # Seconds to wait before giving up
    pool_recycle=3600    # Recycle connections after 1 hour
)

Optimizing Data Transfer

Fetch only aggregated data from the database.

def get_dashboard_data():
    return get_data("""
        SELECT
            DATE(created_at) as date,
            COUNT(*) as user_count,
            AVG(score) as avg_score
        FROM users
        WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
        GROUP BY DATE(created_at)
        ORDER BY date
    """)

Summary

This chapter covered connecting Streamlit applications to major databases.

Key takeaways:

  1. Use Connection Pools: Manage connections efficiently with @st.cache_resource.
  2. Choose the Right Database: Select a database that fits your project's needs.
  3. Write Efficient Queries: Retrieve only the data you need to keep your app fast.
  4. Handle Errors Gracefully: Implement robust error handling.
  5. Optimize Performance: Use caching to reduce latency and database load.

Next Steps

Now that you can connect to your data, the next chapter will demonstrate how to use AI to generate and edit code, enabling you to build more sophisticated BI dashboards.