Back to Journal
System Design

Database Sharding Best Practices for High Scale Teams

Battle-tested best practices for Database Sharding tailored to High Scale teams, including anti-patterns to avoid and a ready-to-use checklist.

Muneer Puthiya Purayil 12 min read

At high scale, database sharding is not a choice — it is a survival mechanism. When your single database processes 200,000 queries per second and stores 50TB of data, no amount of vertical scaling or read replica tuning will save you. High-scale sharding demands a different engineering mindset: everything is a distributed systems problem, every query is a potential cross-shard operation, and every failure mode involves partial system availability. These best practices come from operating sharded databases at 500K+ QPS across hundreds of shards.

High-Scale Sharding Fundamentals

At high scale, the shard count alone creates operational challenges. Managing 100+ shards means 100+ connection pools, 100+ backup schedules, 100+ monitoring targets, and schema migrations that take hours to roll across all shards. Every practice must account for this operational multiplier.

Best Practices

1. Use Range-Based Sharding with Dynamic Split Points

Consistent hashing works well up to ~50 shards. Beyond that, range-based sharding with automatic split points provides better control over data distribution.

go
1type ShardRange struct {
2 ShardID string
3 LowerBound uint64
4 UpperBound uint64
5 Host string
6 Port int
7 Status string // active, splitting, draining
8}
9 
10type RangeShardRouter struct {
11 ranges []ShardRange
12 mu sync.RWMutex
13}
14 
15func (r *RangeShardRouter) Route(shardKey string) (*ShardRange, error) {
16 hash := xxhash.Sum64String(shardKey)
17
18 r.mu.RLock()
19 defer r.mu.RUnlock()
20
21 idx := sort.Search(len(r.ranges), func(i int) bool {
22 return r.ranges[i].UpperBound >= hash
23 })
24
25 if idx >= len(r.ranges) {
26 return nil, fmt.Errorf("no shard found for key hash %d", hash)
27 }
28
29 shard := &r.ranges[idx]
30 if shard.Status != "active" {
31 return nil, fmt.Errorf("shard %s is %s", shard.ShardID, shard.Status)
32 }
33
34 return shard, nil
35}
36 
37func (r *RangeShardRouter) SplitShard(shardID string) error {
38 r.mu.Lock()
39 defer r.mu.Unlock()
40
41 for i, shard := range r.ranges {
42 if shard.ShardID == shardID {
43 midpoint := shard.LowerBound + (shard.UpperBound - shard.LowerBound) / 2
44
45 // Create new shard for upper half
46 newShard := ShardRange{
47 ShardID: fmt.Sprintf("%s-upper", shardID),
48 LowerBound: midpoint + 1,
49 UpperBound: shard.UpperBound,
50 Status: "splitting",
51 }
52
53 // Shrink original to lower half
54 r.ranges[i].UpperBound = midpoint
55 r.ranges = append(r.ranges, newShard)
56
57 sort.Slice(r.ranges, func(a, b int) bool {
58 return r.ranges[a].LowerBound < r.ranges[b].LowerBound
59 })
60
61 return nil
62 }
63 }
64 return fmt.Errorf("shard %s not found", shardID)
65}
66 

2. Implement Connection Pool Management Per Shard

At 100+ shards, connection management becomes a primary concern. Each shard connection pool must be independently tunable.

go
1type ShardPoolManager struct {
2 pools map[string]*pgxpool.Pool
3 configs map[string]*pgxpool.Config
4 mu sync.RWMutex
5 metrics *ShardMetrics
6}
7 
8func NewShardPoolManager(shards []ShardConfig) (*ShardPoolManager, error) {
9 manager := &ShardPoolManager{
10 pools: make(map[string]*pgxpool.Pool),
11 configs: make(map[string]*pgxpool.Config),
12 metrics: NewShardMetrics(),
13 }
14 
15 for _, shard := range shards {
16 config, err := pgxpool.ParseConfig(shard.DSN)
17 if err != nil {
18 return nil, fmt.Errorf("parse config for %s: %w", shard.ID, err)
19 }
20
21 config.MaxConns = int32(shard.MaxConnections)
22 config.MinConns = int32(shard.MinConnections)
23 config.MaxConnLifetime = 30 * time.Minute
24 config.MaxConnIdleTime = 5 * time.Minute
25 config.HealthCheckPeriod = 30 * time.Second
26
27 config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
28 manager.metrics.ConnectionAcquired(shard.ID)
29 return true
30 }
31 
32 pool, err := pgxpool.NewWithConfig(context.Background(), config)
33 if err != nil {
34 return nil, fmt.Errorf("create pool for %s: %w", shard.ID, err)
35 }
36 
37 manager.pools[shard.ID] = pool
38 manager.configs[shard.ID] = config
39 }
40 
41 return manager, nil
42}
43 
44func (m *ShardPoolManager) GetPool(shardID string) (*pgxpool.Pool, error) {
45 m.mu.RLock()
46 defer m.mu.RUnlock()
47
48 pool, ok := m.pools[shardID]
49 if !ok {
50 return nil, fmt.Errorf("unknown shard: %s", shardID)
51 }
52 return pool, nil
53}
54 
55func (m *ShardPoolManager) HealthCheck(ctx context.Context) map[string]ShardHealth {
56 m.mu.RLock()
57 defer m.mu.RUnlock()
58
59 results := make(map[string]ShardHealth)
60 var wg sync.WaitGroup
61 var mu sync.Mutex
62 
63 for id, pool := range m.pools {
64 wg.Add(1)
65 go func(shardID string, p *pgxpool.Pool) {
66 defer wg.Done()
67
68 start := time.Now()
69 err := p.Ping(ctx)
70 latency := time.Since(start)
71
72 stat := p.Stat()
73 health := ShardHealth{
74 ShardID: shardID,
75 Healthy: err == nil,
76 Latency: latency,
77 TotalConns: stat.TotalConns(),
78 IdleConns: stat.IdleConns(),
79 AcquiredConns: stat.AcquiredConns(),
80 }
81
82 mu.Lock()
83 results[shardID] = health
84 mu.Unlock()
85 }(id, pool)
86 }
87 
88 wg.Wait()
89 return results
90}
91 

3. Build Parallel Scatter-Gather with Bounded Concurrency

Cross-shard queries must execute in parallel with timeouts and partial failure handling.

go
1type ScatterGatherExecutor struct {
2 poolManager *ShardPoolManager
3 maxParallel int
4 timeout time.Duration
5}
6 
7func (e *ScatterGatherExecutor) Execute(
8 ctx context.Context,
9 shardIDs []string,
10 query string,
11 args []interface{},
12) ([]ShardResult, error) {
13 ctx, cancel := context.WithTimeout(ctx, e.timeout)
14 defer cancel()
15 
16 results := make([]ShardResult, len(shardIDs))
17 sem := make(chan struct{}, e.maxParallel)
18 var wg sync.WaitGroup
19 
20 for i, shardID := range shardIDs {
21 wg.Add(1)
22 go func(idx int, id string) {
23 defer wg.Done()
24 sem <- struct{}{}
25 defer func() { <-sem }()
26 
27 pool, err := e.poolManager.GetPool(id)
28 if err != nil {
29 results[idx] = ShardResult{ShardID: id, Error: err}
30 return
31 }
32 
33 rows, err := pool.Query(ctx, query, args...)
34 if err != nil {
35 results[idx] = ShardResult{ShardID: id, Error: err}
36 return
37 }
38 defer rows.Close()
39 
40 var data []map[string]interface{}
41 for rows.Next() {
42 values, _ := rows.Values()
43 fields := rows.FieldDescriptions()
44 row := make(map[string]interface{})
45 for j, field := range fields {
46 row[string(field.Name)] = values[j]
47 }
48 data = append(data, row)
49 }
50 
51 results[idx] = ShardResult{ShardID: id, Data: data}
52 }(i, shardID)
53 }
54 
55 wg.Wait()
56 return results, nil
57}
58 

4. Implement Online Shard Splitting

At high scale, you cannot afford downtime for shard splits. Implement online splitting with dual-write during the transition.

go
1type ShardSplitter struct {
2 sourcePool *pgxpool.Pool
3 targetPool *pgxpool.Pool
4 router *RangeShardRouter
5}
6 
7func (s *ShardSplitter) Split(ctx context.Context, config SplitConfig) error {
8 // Phase 1: Initial bulk copy
9 log.Printf("Starting bulk copy from %s to %s", config.SourceShard, config.TargetShard)
10 if err := s.bulkCopy(ctx, config); err != nil {
11 return fmt.Errorf("bulk copy: %w", err)
12 }
13 
14 // Phase 2: Enable dual-write (writes go to both shards)
15 log.Printf("Enabling dual-write")
16 s.router.EnableDualWrite(config.SourceShard, config.TargetShard, config.SplitPoint)
17 
18 // Phase 3: Catch-up copy (sync changes made during bulk copy)
19 log.Printf("Starting catch-up sync")
20 if err := s.catchUpSync(ctx, config); err != nil {
21 return fmt.Errorf("catch-up sync: %w", err)
22 }
23 
24 // Phase 4: Verify consistency
25 log.Printf("Verifying consistency")
26 if err := s.verifyConsistency(ctx, config); err != nil {
27 return fmt.Errorf("consistency check: %w", err)
28 }
29 
30 // Phase 5: Cut over routing
31 log.Printf("Cutting over routing")
32 s.router.CutOver(config.SourceShard, config.TargetShard, config.SplitPoint)
33 
34 // Phase 6: Clean up source shard (remove migrated data)
35 log.Printf("Cleaning up source shard")
36 return s.cleanupSource(ctx, config)
37}
38 

5. Design for Partial Shard Failures

At high scale, individual shard failures are routine. The system must degrade gracefully.

go
1type ResilientShardClient struct {
2 poolManager *ShardPoolManager
3 circuitBreakers map[string]*CircuitBreaker
4}
5 
6func (c *ResilientShardClient) Query(
7 ctx context.Context,
8 shardID string,
9 query string,
10 args ...interface{},
11) (pgx.Rows, error) {
12 cb := c.circuitBreakers[shardID]
13
14 if !cb.AllowRequest() {
15 return nil, &ShardUnavailableError{ShardID: shardID}
16 }
17 
18 pool, err := c.poolManager.GetPool(shardID)
19 if err != nil {
20 cb.RecordFailure()
21 return nil, err
22 }
23 
24 rows, err := pool.Query(ctx, query, args...)
25 if err != nil {
26 cb.RecordFailure()
27 return nil, err
28 }
29 
30 cb.RecordSuccess()
31 return rows, nil
32}
33 
34type CircuitBreaker struct {
35 failures atomic.Int64
36 lastFailure atomic.Int64
37 threshold int64
38 resetAfter time.Duration
39}
40 
41func (cb *CircuitBreaker) AllowRequest() bool {
42 if cb.failures.Load() < cb.threshold {
43 return true
44 }
45 lastFail := time.Unix(0, cb.lastFailure.Load())
46 return time.Since(lastFail) > cb.resetAfter
47}
48 

6. Automate Shard Rebalancing

Data skew is inevitable. Automate detection and rebalancing before manual intervention is required.

go
1func (m *ShardMonitor) DetectImbalance(ctx context.Context) []RebalanceAction {
2 healths := m.poolManager.HealthCheck(ctx)
3 var actions []RebalanceAction
4 
5 var sizes []int64
6 for _, h := range healths {
7 sizes = append(sizes, h.DataSizeBytes)
8 }
9 
10 avg := average(sizes)
11 stddev := standardDeviation(sizes)
12 
13 for shardID, health := range healths {
14 deviation := float64(health.DataSizeBytes-avg) / float64(stddev)
15 if deviation > 2.0 {
16 actions = append(actions, RebalanceAction{
17 Type: "split",
18 ShardID: shardID,
19 Reason: fmt.Sprintf("size %.1f stddev above mean", deviation),
20 })
21 }
22 }
23 
24 return actions
25}
26 

Need a second opinion on your system design architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Anti-Patterns to Avoid

Shared Sequences Across Shards

A centralized sequence generator becomes a single point of failure and bottleneck. Use UUIDv7 or shard-prefixed Snowflake IDs that can be generated locally on each shard.

Unbounded Cross-Shard Queries

A query that fans out to all 100+ shards without pagination or limits can overwhelm the system. Always include limits in scatter-gather queries and paginate cross-shard results.

Manual Shard Assignment

Manually mapping tenants to shards works with 10 shards. At 100+ shards, any manual process introduces human error. Automate all shard assignment and rebalancing.

High-Scale Readiness Checklist

  • Range-based routing with automatic split point management
  • Per-shard connection pool with independent tuning
  • Bounded-concurrency scatter-gather for cross-shard queries
  • Online shard splitting with zero downtime
  • Circuit breakers per shard for partial failure isolation
  • Automated shard imbalance detection and rebalancing
  • Distributed ID generation (UUIDv7 or Snowflake)
  • Per-shard monitoring with automated alerting
  • Schema migration tooling that handles 100+ shards
  • Shard-level backup and restore tested monthly
  • Load tested at 3x current peak per shard
  • Runbook for emergency shard evacuation

Conclusion

High-scale database sharding transforms your database from a single system to a distributed platform. Success requires treating every shard as an independent failure domain, automating everything that humans cannot do reliably at scale, and designing for partial availability rather than all-or-nothing consistency. The investment in routing infrastructure, online splitting, and automated rebalancing pays off exponentially as your shard count grows.

FAQ

Need expert help?

Building with system design?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026