Topic 10: Heap / Priority Queue
Table of Contents
Key-Intuition: Heaps efficiently maintain order-statistics in dynamic datasets, great for priority-based retrieval and merging sorted sequences.
We can expect them to be especially useful when used as aux datastructures, in conjunction with other topic areas.
Canonical Question Types #
Top-K and Median Finding #
- Objective
- Efficiently find the k-th largest, smallest, or median elements from streaming data or static arrays.
Problems #
Oh wait we can do this via heapq, just have to use the api correctly else will be too slow. Since we need kth largest, we can effectively keep a fixed capacity in our heap by checking the size of it and if it’s already k elements large, we can do
heapreplaceinstead ofheappush. Other attempts to manually push and pop from the heap manually will have ridiculous performance overheads.1 2 3 4 5 6 7 8 9 10 11 12import heapq class Solution: def findKthLargest(self, nums, k): heap = [] for num in nums: if len(heap) < k: heapq.heappush(heap, num) else: if num > heap[0]: heapq.heapreplace(heap, num) return heap[0]Alternatively, for this question, we can use the quickselect algo, though that is more manual.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43import random from typing import List class Solution: def findKthLargest(self, nums: List[int], k: int) -> int: def quickselect(left, right, target): if left == right: return nums[left] pivot_index = random.randint(left, right) # Partition around pivot and return final pivot position new_pivot = partition(left, right, pivot_index) if new_pivot == target: return nums[new_pivot] elif new_pivot < target: return quickselect(new_pivot + 1, right, target) else: return quickselect(left, new_pivot - 1, target) def partition(left, right, pivot_index): """ Returns the pivot idx after paritioning left and right segments based on the element values. The property it maintains thereafter is the all to the right of pivot are STRICTLY MORE THAN pivot. All the left of hte pivot are STRICTLY LESS THAN pivot. """ pivot_value = nums[pivot_index] nums[pivot_index], nums[right] = nums[right], nums[pivot_index] store_index = left for i in range(left, right): if nums[i] < pivot_value: nums[store_index], nums[i] = nums[i], nums[store_index] store_index += 1 # move pivot to its final place nums[store_index], nums[right] = nums[right], nums[store_index] return store_index n = len(nums) target = n - k # index of k-th largest in sorted order return quickselect(0, n - 1, target)Kth Largest Element in a Stream (703)
the only thing that matters here is that we keep a fixed capacity heap “manually” and it’s good enough for us. There’s no need to negate this because we let the structure handle the order, don’t short circuit that part.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15import heapq class KthLargest: def __init__(self, k: int, nums: list[int]): self.k = k self.heap = nums heapq.heapify(self.heap) # prune it to only keep top k elements while len(self.heap) > self.k: heapq.heappop(self.heap) def add(self, val: int) -> int: heapq.heappush(self.heap, val) if len(self.heap) > self.k: heapq.heappop(self.heap) return self.heap[0]Here I use heapreplace to make the fixed capacity handling a little more idiomatic:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17import heapq class KthLargest: def __init__(self, k, nums): self.k = k self.heap = nums heapq.heapify(self.heap) while len(self.heap) > k: heapq.heappop(self.heap) def add(self, val): if len(self.heap) < self.k: heapq.heappush(self.heap, val) else: # Only pushpop if val is bigger, otherwise kth largest does not change if val > self.heap[0]: heapq.heapreplace(self.heap, val) return self.heap[0]⭐️ Find Median from Data Stream (295)
Streams usually we can consider using a balancing approach a max and a min heap (to get the median).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23import heapq class MedianFinder: def __init__(self): self.small = [] # Max heap (as negative numbers) self.large = [] # Min heap def addNum(self, num: int) -> None: # Python heapq is minheap, so push negative for maxheap heapq.heappush(self.small, -num) # All numbers in small must be <= numbers in large, so we take the largest number and shift it to the large heap heapq.heappush(self.large, -heapq.heappop(self.small)) # Balance: small can be larger by 1, so we rebalance if large is greater than small if len(self.large) > len(self.small): heapq.heappush(self.small, -heapq.heappop(self.large)) def findMedian(self) -> float: # odd numbers encountered: if len(self.small) > len(self.large): return -self.small[0] # even numbers encountered: return (self.large[0] - self.small[0]) / 2.0Suppose we know that the range is a bounded range, then we have the option to just keep track of the median position based on the bounded range.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32class MedianFinder: def __init__(self): self.freq = [0] * 101 # freq[i]: count of number i in [0,100] self.count = 0 # total number of inserted values def addNum(self, num: int) -> None: self.freq[num] += 1 self.count += 1 def findMedian(self) -> float: total = self.count # Find the median index(es) (0-based in sorted stream) if total % 2 == 1: median_pos1 = median_pos2 = total // 2 else: median_pos1 = total // 2 - 1 median_pos2 = total // 2 # helps us account for the even cases where we'll have 2 values result = [] running_count = 0 for value in range(101): # Loop over all possible values running_count += self.freq[value] # Add value to result if we cross the median positions if len(result) == 0 and running_count > median_pos1: result.append(value) if len(result) == 1 and running_count > median_pos2: result.append(value) break return sum(result) / len(result)K Closest Points to Origin (973)
It’s a euclidean distance shortcut where we don’t do the sqrt-ing. We judge using a valid proxy metric that is easier to calculate.
Also, it’s so much simpler to just use the
heapq.nsmallest()api directly for this question.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20import heapq class Solution: def kClosest(self, points: list[list[int]], k: int) -> list[list[int]]: get_euclidean_dist = lambda p: p[0]**2 + p[1]**2 return heapq.nsmallest(k, points, key=get_euclidean_dist) # just for demo, here's a more handwritten one: import heapq class Solution: def kClosest(self, points: list[list[int]], k: int) -> list[list[int]]: # Maintain a max-heap of size k (heap stores -(distance, point)) heap = [] for p in points: dist = -(p[0] ** 2 + p[1] ** 2) if len(heap) < k: heapq.heappush(heap, (dist, p)) else: heapq.heappushpop(heap, (dist, p)) return [item[1] for item in heap]
Merging Sorted Lists or Arrays #
- Objective
- Merge multiple sorted sequences or intervals efficiently using min-heaps.
Problems #
here, the heapq is used as an aux ds to help make it easier to choose which is the next provider (linked list) to take from. Here’s both the solutions.
Same concept as 2 sorted lists, just maintain k providers. We have two solutions, one that is space-optimal \(O(N / logk)\) time \(O(1)\) space and one that is not \(O(N / logk)\) time \(O(k)\) space .
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25class Solution: def mergeKLists(self, lists: List[Optional[ListNode]]) -> Optional[ListNode]: if not lists: return None n = len(lists) interval = 1 while interval < n: for i in range(0, n - interval, interval * 2): lists[i] = self.merge2Lists(lists[i], lists[i + interval]) interval *= 2 return lists[0] if n > 0 else None def merge2Lists(self, l1, l2): dummy = curr = ListNode() while l1 and l2: if l1.val < l2.val: curr.next, l1 = l1, l1.next else: curr.next, l2 = l2, l2.next # advance curr = curr.next curr.next = l1 or l2 return dummy.nextThe easiest way imo to get the best value each time is to just use a priority queue. I prefer the PQ solution.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29import heapq # Definition for singly-linked list. # class ListNode: # def __init__(self, val=0, next=None): # self.val = val # self.next = next class Solution: def mergeKLists(self, lists: List[Optional[ListNode]]) -> Optional[ListNode]: # filters away the empty lists: # NOTE: the use of id(ls) is to give a tie-breaker in the case that two lists have the same value. # this id usage is an easy way to avoid compiler issues providers = [(ls.val, id(ls), ls) for ls in lists if ls] heapq.heapify(providers) pre_head = dummy = ListNode() while providers: (val, _, ls) = heapq.heappop(providers) dummy.next = ls if ls.next: ls = ls.next new_val = ls.val heapq.heappush(providers, (new_val, id(ls), ls)) dummy = dummy.next return pre_head.nextMerge Intervals (56) with heap variant
The optimal solution is typically a linear scan but we could rely on a heap version of this solution as well.
It’s a linear scan alright (after sorting intervals using the start time), we have a working accumulator that we keep making reference to.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20from typing import List class Solution: def merge(self, intervals: List[List[int]]) -> List[List[int]]: if not intervals: return [] # Sort intervals based on start time intervals.sort(key=lambda x: x[0]) merged_intervals = [intervals[0]] for start, end in intervals[1:]: last_merged_start, last_merged_end = merged_intervals[-1] if start <= last_merged_end: # Overlapping intervals merged_intervals[-1][1] = max(last_merged_end, end) # Merge else: merged_intervals.append([start, end]) # No overlap, add new return merged_intervalsIn the heap-based approach, intervals can be pushed into a min-heap based on their start time, and then repeatedly pop and merge the top intervals.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24import heapq class Solution: def merge(self, intervals: list[list[int]]) -> list[list[int]]: if not intervals: return [] # Build a min-heap by the start time heap = [(start, end) for start, end in intervals] heapq.heapify(heap) result = [] # Pop the first interval prev_start, prev_end = heapq.heappop(heap) while heap: curr_start, curr_end = heapq.heappop(heap) if curr_start <= prev_end: # Merge intervals prev_end = max(prev_end, curr_end) else: # No overlap, push previous interval to result result.append([prev_start, prev_end]) prev_start, prev_end = curr_start, curr_end # Don't forget the last interval result.append([prev_start, prev_end]) return resultThis approach is not more optimal than the linear scan (after sorting), but it is instructive as an alternative and can be more useful if the input is streaming or not already sorted.
Frequency and Top-K Elements #
- Objective
- Identify the top k most frequent elements in arrays or strings using priority queues for efficient counts management.
Problems #
There’s a couple of ways to do this since we’re asked for frequency counts.
The most optimal approach that runs in \(O(n)\) time is BucketSorting which looks like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18from collections import Counter class Solution: def topKFrequent(self, nums: List[int], k: int) -> List[int]: count = Counter(nums) # Bucket sort: index = frequency, value = list of numbers with that frequency max_possible_frequency = len(nums) buckets = [[] for _ in range(max_possible_frequency + 1)] for num, freq in count.items(): buckets[freq].append(num) # Flatten buckets from high to low frequency res = [] for freq in range(len(buckets) - 1, 0, -1): for num in buckets[freq]: res.append(num) if len(res) == k: return resThe other solutions that are suboptimal will run in \(O(n log n)\) time. Firstly we consider the heap based solution: A heap-based approach is \(O(n log k)\) where \(k\) is the char count
1 2 3 4 5 6 7import heapq from collections import Counter class Solution: def topKFrequent(self, nums: List[int], k: int) -> List[int]: count = Counter(nums) return [item for item, _ in heapq.nlargest(k, count.items(), key=lambda x: x[1])]A counter based solution runs in \(O(nlogn)\) :
1 2 3 4 5from collections import Counter class Solution: def topKFrequent(self, nums: List[int], k: int) -> List[int]: return [elem for [ elem, _ ] in Counter(nums).most_common(k)]
Scheduling and Task Management #
- Objective
- Use priority queues for simulating task scheduling, managing intervals, and handling dynamic priorities.
- Classic Examples (often conceptual or system design related)
Problems #
This is a parable to actual scheduling algos, the idea is just to maintain a running schedule and just retrieve from there.
1 2 3 4 5 6 7 8 9 10 11 12 13 14import heapq class Solution: def lastStoneWeight(self, stones: list[int]) -> int: stones = [-s for s in stones] # API is a minheap, hence the negation heapq.heapify(stones) while len(stones) > 1: first = -heapq.heappop(stones) # Largest second = -heapq.heappop(stones) # Next largest if first != second: heapq.heappush(stones, -(first - second)) return -stones[0] if stones else 0Task Scheduling variations
CPU scheduling simulation problems
This can do a time-simulation approach.
We use two things: a max heap for schedule-able tasks and a cooldown QUEUE for the tasks that are currently in cooldown. Cooldowns happen in FIFO hence the use of the deque. I prefer this approach to the problem over the purely mathematical approach.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22import heapq from collections import Counter, deque # TIME SIMULATION APPROACH class Solution: def leastInterval(self, tasks: List[str], n: int) -> int: freqs = Counter(tasks) pq = [(-cnt, task) for task, cnt in freqs.items()] heapq.heapify(pq) cooldown = deque() # (ready_time, -cnt, task) time = 0 while pq or cooldown: time += 1 if pq: cnt, task = heapq.heappop(pq) if cnt + 1: # more left cooldown.append((time + n, cnt + 1, task)) if cooldown and cooldown[0][0] == time: _, next_cnt, next_task = cooldown.popleft() heapq.heappush(pq, (next_cnt, next_task)) return timeThis particular question has a math approach too. Realise that the tasks with max frequency are the bottleneck in the schedule. We need to ensure at least n intervals between repeats of the most common task. We fill all cooldown slots with other tasks if we can else we must idle. So the formula to compute the time needed if we strictly alternate between maxtasks and idleslots, possibly filling them with other tasks, otherwise falling back to the total number of tasks.
1 2 3 4 5 6 7 8 9 10 11from collections import Counter class Solution: def leastInterval(self, tasks: List[str], n: int) -> int: freqs = Counter(tasks) f_max = max(freqs.values()) n_max = sum(1 for freq in freqs.values() if freq == f_max) part_len = f_max - 1 empty_slots = part_len * (n + 1) res = part_len * (n + 1) + n_max return max(res, len(tasks))
Online Median / Running Statistics #
- Objective
- Maintain running medians or generalized order-statistics efficiently on dynamically inserting and removing elements.
Problems #
- TODO Sliding Window Median (480)
- Data Stream Median or Quantile problems
Heap Sort and Selection Problems #
- Objective
- Use heaps for efficient sorting or selecting the nth smallest/largest element.
Problems #
Heap Sort algorithms
Kth Smallest Element In a Sorted Matrix
The heap implementation of this involves us keeping a heap and popping from it for (k - 1) times. We keep adding to the heap if possible. The init version of the heap is just all the first elements from each row.
We do this popping because we keep a minheap that takes in values from cells within the matrix.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21import heapq class Solution: def kthSmallest(self, matrix: List[List[int]], k: int) -> int: n = len(matrix) # we init the heap with the first element from each row: heap = [ (matrix[row][0], 0, row) for row in range(n) ] # (elem, column idx, row idx) heapq.heapify(heap) # now we extract from the min heap for (k - 1) times so that we can pop out the kth smallest element: for _ in range(k - 1): elem, col, row = heapq.heappop(heap) # add next element to heap, if exists: if col < (n - 1): new_elem = matrix[row][col + 1] heapq.heappush(heap, (new_elem, col + 1, row)) return heap[0][0]
Interval and Range Merging with Heaps #
- Objective
- Maintain earliest ending intervals or ranges for merging or scheduling problems using min-heaps.
Problems #
Meeting Rooms II (253) (NC link)
We have 2 choices: we can use a min heap and play with the intervals OR we can do the sweep line algo Here’s my version with PQs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28import heapq class Solution: def minMeetingRooms(self, intervals: List[Interval]) -> int: if not intervals: return 0 # Sort intervals by start time intervals.sort(key=lambda x: x.start) # Min-heap to track end times of meetings currently occupying rooms min_heap = [] # Add the first meeting's end time heapq.heappush(min_heap, intervals[0].end) for interval in intervals[1:]: start, end = interval.start, interval.end # If the current meeting starts after or when a room is freed if start >= min_heap[0]: heapq.heappop(min_heap) # Reuse room # Assign new room (or reused room with updated end time) heapq.heappush(min_heap, end) # Number of rooms = size of min-heap return len(min_heap)And here’s the usage of the sweep line algo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30# SWEEP-LINE ALGO def minMeetingRooms(intervals): if not intervals: return 0 starts = sorted(i[0] for i in intervals) ends = sorted(i[1] for i in intervals) s, e = 0, 0 rooms = 0 max_rooms = 0 while s < len(intervals): if starts[s] < ends[e]: rooms += 1 max_rooms = max(max_rooms, rooms) s += 1 else: rooms -= 1 e += 1 return max_rooms # - Explanation: # - Separate all start times and end times into two sorted lists. # - Use pointers to iterate through both arrays, increment a counter when a meeting starts, decrement when one ends. # - Track the max counter during traversal → max concurrency → min rooms required. # - This approach avoids heaps and uses two sorted arrays, can be slightly faster for certain use-cases.Merging from one-side problem. Can view the problem as creating contiguous segments of the cars array based on arrival times after merging. Each segment maps to exactly one car fleet. Top of the stack will be the next arriving fleet, nearest to the destination, we have to just check if the car in consideration is going to be part of that fleet or will start a new fleet.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16class Solution: def carFleet(self, target: int, position: List[int], speed: List[int]) -> int: cars = sorted(list(zip(position, speed)), reverse=True) arrival_time_stack = [] for pos, s in cars: arrival_time = (target - pos) / s if not arrival_time_stack: # empty stack arrival_time_stack.append(arrival_time) continue # if slower than the prev arrival, then it's a new fleet, else ignore this: if arrival_time_stack and arrival_time > arrival_time_stack[-1]: arrival_time_stack.append(arrival_time) return len(arrival_time_stack)for completeness, here’s a heap-based approach that is similar
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25import heapq from typing import List class Solution: def carFleet(self, target: int, position: List[int], speed: List[int]) -> int: # Build max-heap based on position (negate to simulate max-heap with Python's min-heap) heap = [(-pos, speed[i]) for i, pos in enumerate(position)] heapq.heapify(heap) fleets = 0 last_arrival_time = 0.0 # Process cars from closest to target (largest position) to farthest while heap: pos, spd = heapq.heappop(heap) pos = -pos # revert negation to original position arrival_time = (target - pos) / spd # If current car's arrival time is greater than last fleet's time, it forms a new fleet if arrival_time > last_arrival_time: fleets += 1 last_arrival_time = arrival_time # else, it merges into the fleet represented by last_arrival_time return fleets
Graph Algorithms using Heaps #
- Objective
Use heaps for efficient priority-based processing in graph algorithms like Dijkstra’s or Prim’s.
Refer directly to the Advanced Graph Sections below.
Problems #
This is just a classic dijkstra, single source we want to find the longest branch form this single source to every other node. Applies for network related problem domains as well. Note the use of the 1-idx shimming and splicing here.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26from collections import defaultdict import heapq class Solution: def networkDelayTime(self, times: List[List[int]], n: int, k: int) -> int: graph = defaultdict(list) for u, v, w in times: graph[u].append((v, w)) # we're going to shim the 0th idx to handle the 1-indexing, then later ignore it dist = [float('inf')] * (n + 1) dist[k] = 0 heap = [(0, k)] # current dist, node (min-pq by distance) while heap: curr_dist, node = heapq.heappop(heap) if curr_dist > dist[node]: continue for nei, w in graph[node]: # if can relax: if curr_dist + w < dist[nei]: dist[nei] = curr_dist + w heapq.heappush(heap, (dist[nei], nei)) max_dist = max(dist[1:]) return max_dist if max_dist != float('inf') else -1Minimum Spanning Tree algorithms
Custom Comparator / Complex Priority Logic #
- Objective
- Implement priority queues with custom ordering or multi-key priorities.
Problems #
Any problem requiring lex order or multi-field ordering for tasks or objects
The idea here is to maintain multiple PQs, one for each user then we can merge them in using
heapq.merge()1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24from collections import defaultdict import heapq class Twitter: def __init__(self): self.tweet_count = 0 self.uid_to_tweets = defaultdict(list) self.uid_to_following = defaultdict(set) def postTweet(self, userId: int, tweetId: int) -> None: self.tweet_count += 1 self.uid_to_tweets[userId].append((self.tweet_count, tweetId, userId)) def getNewsFeed(self, userId: int) -> list[int]: posts = heapq.merge(*(self.uid_to_tweets[uid] for uid in self.uid_to_following[userId] | {userId})) return [post[1] for post in heapq.nlargest(10, posts)] def follow(self, followerId: int, followeeId: int) -> None: self.uid_to_following[followerId].add(followeeId) def unfollow(self, followerId: int, followeeId: int) -> None: self.uid_to_following[followerId].discard(followeeId)However, the optimal approach is really to implement k-way merging:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39from collections import defaultdict import heapq class Twitter: def __init__(self): self.time = 0 self.user_tweets = defaultdict(list) self.following = defaultdict(set) def postTweet(self, userId: int, tweetId: int) -> None: self.time += 1 self.user_tweets[userId].append((self.time, tweetId)) def getNewsFeed(self, userId: int) -> list[int]: result = [] # We'll build a heap of (timestamp, tweetId, whose_list, index) heap = [] users = self.following[userId].copy() users.add(userId) for uid in users: tweets = self.user_tweets[uid] if tweets: # push last tweet (most recent), track index ts, tid = tweets[-1] heapq.heappush(heap, (-ts, tid, uid, len(tweets) - 1)) while heap and len(result) < 10: ts, tid, uid, idx = heapq.heappop(heap) result.append(tid) if idx > 0: prev_ts, prev_tid = self.user_tweets[uid][idx - 1] heapq.heappush(heap, (-prev_ts, prev_tid, uid, idx - 1)) return result def follow(self, followerId: int, followeeId: int): if followerId != followeeId: self.following[followerId].add(followeeId) def unfollow(self, followerId: int, followeeId: int): self.following[followerId].discard(followeeId)
Approximate / Streaming Data Structures with Heaps #
- Objective
- Use approximate counting and priority heuristics for large or streaming datasets.
Problems #
- Count-Min Sketch with priority queues (more advanced/outside standard Leetcode scope)
SOE #
Reversed or wrong comparator in heap (min vs max). The default is min heap ordedring, so if we want to keep max heaps, then we have to negate the values.
Forgetting lazy deletion or invalidation in PQ operations
Improper heapify or failure to maintain heap invariant
Remember that we can technically peek by just checking the first element in the heap, that’s guaranteed to the min element in the heap.
Sometimes a heap is overkill if we can get away with doing a quickselect, since that would avoid completely sorting the input
forgetting the heap invariants (should only use the heap API for our needs, we shouldn’t accidentally think that it’s a sorted array)
Decision Flow #
- Need streaming top-k? → Sliding window or heap
- Merging sorted data? → Min-heap merging
- Frequency based selections? → Heap top-k strategy
Styles, Tricks and Boilerplate #
Boilerplate #
Quickselect Algo #
The main part is the partition function, then the rest is the intuitive understanding of the invariants / properties that lead us to the reason why partitioning it k times is good enough.
| |
Style: #
- let the heap do its job when you’re maintaining order statistics, just prune AFTER the heap does its job (e.g. after it gets added).
Tricks: #
Double heap approach is optimal for general data-streams. We can keep balancing them and we can rebalance when we add / remove to the overall system.
MATH TRICK: when dealing with Euclidean distances, we can just use the squared distances and avoid having to do the sqrt.
For stream median finding, if it’s a bounded range, representing the domain via a fixed size array that keeps frequency counts, then just adjusting a median pointer is a great idea.
Python Recipes: #
- Heap order statistics functions:
We can pass in a lambda for the nsmallest comparator.
E.g.
return heapq.nsmallest(k, points, key=lambda p: p[0]**2 + p[1]**2)copying a heap preserves its heap properties because it’s “just a list”
heapq.heappushpop()andheapq.heapreplaceis very useful:1 2 3 4 5 6 7 8 9 10 11 12 13import heapq heap = [5, 7, 9] heapq.heapify(heap) heapq.heappush(heap, 3) # Add element smallest = heapq.heappop(heap) # Remove smallest element result = heapq.heappushpop(heap, 4) # Push 4 then pop smallest result2 = heapq.heapreplace(heap, 6) # Pop smallest then push 6 top3 = heapq.nlargest(3, [1,5,2,9,7]) merged = heapq.merge([1,3,5], [2,4,6])
TODO KIV #
[ ] Existing Canonicals #
- Online Median / Running Statistics:: TODO Sliding Window Median (480)
[ ] Fresh #
- see problems from leetcode’s list of questions here