Suppose we have a large amount of data (e.g. an array of values) that we want to process in parallel. The type of computation that we want to realise is e.g. perform the sum of all values. We use a number of processes to distribute the array between them, each of them performs the sum and the end we just sum the values returned from each process. Easy.
A similary story if we want to compute the average of the array. We partition the array in chunks, and we send each chunk to a different process, calculate the average and then we compute the average of the averages (taking into account the size ofeach chunk).
However if we want to do more complicated operations like the median (or any other percentile) this approach will not work.
Other problems we also find when performing operations in data streams is the need to compute some data statistics or estimate cardinality of a data set without having access to all data (this is if often called online statistics).
While it is straightforward to calculate averages on data streams and algorithms exist to calculate approximate percentiles, if we also want to do it in a distributed system we need to use some more advanced and interesting algorithms.
In the following I describe a few memory efficient algorithms to that can be used in a distributed way, starting for cardinality estimators and ending with percentiles.
Bloom filters are space-optimized data structures designed to estimate set cardinalities as well as determining, with a high degree of likelihood, if they contain a specific element. Bloom filters work by mapping an added element to one or more bits in a bitmap. When an element is added to a Bloom filter, these bits (ideally just one bit) are set to 1.
Intuitively, this means that an n-bit input can be compressed down to a single bit. While an enormous amount of space can be saved using Bloom filters, the tradeoff is that a small possibility of false positives is introduced when determining if a given element has been added to it, because a single input element can potentially map to multiple bits.
Bloom filters can be organized in distributed data structures to perform fully decentralized computations of aggregate functions. Decentralized aggregation makes collective measurements locally available in every node of a distributed network without involving a centralized computational entity for this purpose.
from bloom_filter import BloomFilter bloom = BloomFilter(max_elements=10) 'testkey' in bloom # False bloom.add('testkey') 'testkey' in bloom #True
Let’s do something more interesting. We can split the work between different bloom filters and join then back at the end. Below we have a list of 94 Counties in U.K. We split the list in two and create two bloom filters.
counties = ("Aberdeen City,Aberdeenshire,Anglesey,Angus,Antrim," "Argyll and Bute,Armagh,Bedfordshire,Breconshire,Buckinghamshire," "Caernarvonshire,Cambridgeshire,Cardiganshire,Carmarthenshire," "Cheshire,City of Edinburgh,Clackmannanshire,Cleveland,Cornwall," "Cumbria,Denbighshire,Derbyshire,Derry and Londonderry,Devon,Dorset," "Down,Dumfries and Galloway,Dundee City,Durham,East Ayrshire," "East Dunbartonshire,East Lothian,East Renfrewshire,East Sussex," "Eilean Siar,Essex,Falkirk,Fermanagh,Fife,Flintshire,Glamorgan," "Glasgow City,Gloucestershire,Greater London,Greater Manchester," "Hampshire,Hertfordshire,Highland,Inverclyde,Kent,Lancashire," "Leicestershire,Lincolnshire,Merionethshire,Merseyside,Midlothian," "Monmouthshire,Montgomeryshire,Moray,Norfolk,North Ayrshire," "North Lanarkshire,North Yorkshire,Northamptonshire,Northumberland," "Nottinghamshire,Orkney Islands,Oxfordshire,Pembrokeshire," "Perth and Kinross,Radnorshire,Renfrewshire,Scottish Borders," "Shetland Islands,Shropshire,Somerset,South Ayrshire,South Lanarkshire," "South Yorkshire,Staffordshire,Stirling,Suffolk,Surrey,Tyne and Wear," "Tyrone,Warwickshire,West Berkshire,West Dunbartonshire,West Lothian," "West Midlands,West Sussex,West Yorkshire,Wiltshire," "Worcestershire").split(',')
We split the list in two and feed each sublist to a bloom filter.
bloom1 = BloomFilter(max_elements=100) bloom2 = BloomFilter(max_elements=100) [bloom1.add(key) for key in counties[:42]] [bloom2.add(key) for key in counties[42:]] 'Oxfordshire' in bloom1 # False 'Oxfordshire' in bloom2 # True 'Cambridgeshire' in bloom1 # True 'Cambridgeshire' in bloom2 # False
Both Bloom filters are then combined into one for the final result:
bloom1 |= bloom2 'Oxfordshire' in bloom1 # True 'Cambridgeshire' in bloom1 # True
The difference with other classical data structures is that bloom filters is a probabilistic data structure: it tells us that the element either definitely is not in the set or may be in the set. The benefit is that it is quick and memory-efficient.
A nice and simple explanation of Bloom filters is in http://llimllib.github.io/bloomfilter-tutorial/
The count–min sketch (CM sketch) is a probabilistic data structure that serves as a frequency table of events in a stream of data. It uses hash functions to map events to frequencies, but unlike a hash table uses only sub-linear space, at the expense of overcounting some events due to collisions.
HyperLogLog is a data structure and algorithm combination that, similarly to Bloom filters, is designed to estimate the cardinality of sets with a very high degree of accuracy. In terms of functionality, HyperLogLog only supports adding elements and estimating the cardinality of the set of all elements that have been added. They do not support membership checks of specific elements like Bloom filters do. However, they are drastically more space efficient than Bloom filters.
HyperLogLog works by subdividing its input stream of added elements and storing the maximum number of leading zeros that have been observed within each subdivision. Since elements are uniformly hashed before checking for the number of leading zeros, the general idea is that the greater the number of leading zeros observed, the higher the probability that many unique elements have been added. Empirically, this estimation turns out to be very accurate.
T–Digest is a data structure that supports very accurate estimations of rank-based statistics such as percentiles and medians while only using a constant amount of space. Space efficiency at the expense of a small margin of error makes T-Digest well-suited for rank-based computations on streams, which normally require their input to be finite and ordered for perfect accuracy. T-Digest is essentially an adaptive histogram that intelligently adjusts its buckets and frequencies as more elements are added to it.
from tdigest import TDigest digest = TDigest() for x in range(5000): digest.update(np.random.normal(loc=0.0, scale=1.0)) digest.percentile(75) # 0.6814319117373359
another_digest = TDigest() another_digest.batch_update(np.random.normal(loc=0.0, scale=1.0, size=5000)) another_digest.percentile(75) # 0.6708989573866105
sum_digest = digest + another_digest sum_digest.percentile(75) # 0.677216233877676
Let’s just check calculating the standard deviation from the percentiles1:
for d in [digest, another_digest, sum_digest]: std = (d.percentile(75) - d.percentile(25)) / 2 / 0.675 # digest std: 0.9851971505338745 # another_digest std: 0.99576507833170025 # sum_digest std: 0.9888204940806985
- The relationship between the standard deviation and the percentiles is given by $X = \mu + Z \sigma$ where $X$ is the percentile and $Z$ is the percentile value for a normal distribution with $\sigma=1$, i.e. $Z=-0.675$ for the first quantile and $Z=0.675$ for the third quantile. ^