堆与 TopK 问题怎么解?

2025年 阅读约 15 分钟 面试指南 · 算法

深入解析堆与TopK问题:堆的原理与实现(上浮/下沉)、TopK(小顶堆 O(n log k) vs 快排partition O(n))、数据流中位数(双堆)、合并K个有序链表、前K个高频元素,附面试模拟问答。

一句话总结

堆是完全二叉树,用数组存储(索引 i 的左子 2i+1,右子 2i+2,父 (i-1)//2)。TopK 问题:求最大 K 个用小顶堆(O(n log k)),求最小 K 个用大顶堆。堆操作:插入(末尾插入 + 上浮 swim)、删除堆顶(末尾替换堆顶 + 下沉 sink)。Java 用 PriorityQueue(默认小顶堆),Python 用 heapq(默认小顶堆)。

初级理解

堆的实现(小顶堆)

class MinHeap: def __init__(self): self.heap = [] def parent(self, i): return (i - 1) // 2 def left(self, i): return 2 * i + 1 def right(self, i): return 2 * i + 2 # 上浮:新元素从底部向上调整 def swim(self, i): while i > 0 and self.heap[self.parent(i)] > self.heap[i]: self.heap[i], self.heap[self.parent(i)] = \ self.heap[self.parent(i)], self.heap[i] i = self.parent(i) # 下沉:堆顶元素向下调整 def sink(self, i): n = len(self.heap) while True: smallest = i l, r = self.left(i), self.right(i) if l < n and self.heap[l] < self.heap[smallest]: smallest = l if r < n and self.heap[r] < self.heap[smallest]: smallest = r if smallest == i: break self.heap[i], self.heap[smallest] = \ self.heap[smallest], self.heap[i] i = smallest def push(self, val): self.heap.append(val) self.swim(len(self.heap) - 1) def pop(self): if not self.heap: return None top = self.heap[0] self.heap[0] = self.heap[-1] self.heap.pop() if self.heap: self.sink(0) return top
一句话总结:堆是完全二叉树用数组存;插入上浮,删除下沉。

中级深入

TopK — 小顶堆法(O(n log k))

# 求第 K 大(或前 K 大) import heapq def find_kth_largest(nums, k): heap = [] for num in nums: heapq.heappush(heap, num) if len(heap) > k: heapq.heappop(heap) # 弹出最小的 return heap[0] # 堆顶是第 K 大 # 原理:维护大小为 k 的小顶堆 # 堆顶是 k 个元素中最小的 → 即第 K 大 # 遍历完所有元素后,堆中就是前 K 大 # 求前 K 个高频元素(LeetCode 347) def top_k_frequent(nums, k): from collections import Counter count = Counter(nums) heap = [] for num, freq in count.items(): heapq.heappush(heap, (freq, num)) if len(heap) > k: heapq.heappop(heap) return [num for freq, num in heap]

TopK — 快排 partition 法(O(n) 平均)

def find_kth_largest_partition(nums, k): # 第 K 大 = 第 n-k 小(索引) target = len(nums) - k def partition(left, right): pivot = nums[right] i = left for j in range(left, right): if nums[j] <= pivot: nums[i], nums[j] = nums[j], nums[i] i += 1 nums[i], nums[right] = nums[right], nums[i] return i left, right = 0, len(nums) - 1 while True: idx = partition(left, right) if idx == target: return nums[idx] elif idx < target: left = idx + 1 else: right = idx - 1
中级要点:TopK 用小顶堆 O(n log k);快排 partition O(n) 平均但会修改数组。

高级拓展

数据流中位数(LeetCode 295)— 双堆

import heapq class MedianFinder: def __init__(self): self.max_heap = [] # 左半部分(大顶堆,存负数) self.min_heap = [] # 右半部分(小顶堆) def add_num(self, num): # 先放左半(大顶堆) heapq.heappush(self.max_heap, -num) # 左半最大移到右半 heapq.heappush(self.min_heap, -heapq.heappop(self.max_heap)) # 平衡:左半可以多一个 if len(self.min_heap) > len(self.max_heap): heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap)) def find_median(self): if len(self.max_heap) > len(self.min_heap): return -self.max_heap[0] return (-self.max_heap[0] + self.min_heap[0]) / 2

合并 K 个有序链表(LeetCode 23)

import heapq def merge_k_lists(lists): heap = [] # 每个链表的头节点入堆 for i, node in enumerate(lists): if node: heapq.heappush(heap, (node.val, i, node)) dummy = ListNode(0) curr = dummy while heap: val, i, node = heapq.heappop(heap) curr.next = node curr = curr.next if node.next: heapq.heappush(heap, (node.next.val, i, node.next)) return dummy.next

实战场景

场景:海量数据 TopK

# 10 亿数据找 Top 100 # 1. 分治:数据分片,每个分片找 Top 100 # 2. 合并:所有分片的 Top 100 合并再找 Top 100 # 小顶堆实现 import heapq def top_k_large_file(file_path, k): heap = [] with open(file_path) as f: for line in f: num = int(line.strip()) heapq.heappush(heap, num) if len(heap) > k: heapq.heappop(heap) return sorted(heap, reverse=True) # 时间复杂度:O(n log k) # 空间复杂度:O(k)

面试模拟

面试官:TopK 问题怎么解?

你:两种方法:1)小顶堆,维护大小为 k 的堆,遍历完堆中就是前 K 大,O(n log k);2)快排 partition,每次确定 pivot 位置,直到找到第 n-k 小的位置,O(n) 平均。堆法适合 k 较小或数据流场景,partition 法适合需要 O(n) 的场景。

面试官:数据流中位数怎么实现?

你:双堆法。大顶堆存左半部分(较小的一半),小顶堆存右半部分(较大的一半)。插入时先放大顶堆,再平衡两个堆的大小(大顶堆可以多一个)。中位数:总数奇数 → 大顶堆堆顶;偶数 → 两个堆顶的平均值。插入 O(log n),查询 O(1)。