Data streaming in Python: generators, iterators, iterables

Radim Řehůřek gensim, programming 18 Comments

There are tools and concepts in computing that are very powerful but potentially confusing even to advanced users. One such concept is data streaming (aka lazy evaluation), which can be realized neatly and natively in Python. Do you know when and how to use generators, iterators and iterables?

One at a Time: Out of the door, line on the left, one cross each

Without getting too academic (continuations! coroutines!), the iteration pattern simply allows us go over a sequence without materializing all its items explicitly at once:

numbers = range(100000)

# Example 1, list comprehension. No streaming.
# First create an array of squares, then sum it.
# Note the inner array is simply looped over: no random access, just iteration.
# Wasteful, isn't it?
sum([n**2 for n in numbers])
333328333350000

# Generator: square and sum one value after another
# No extra array created = lazily evaluated stream of numbers!
sum(n**2 for n in numbers)
333328333350000

I’ve seen people argue over which of the two approaches is faster, posting silly micro-second benchmarks. In any serious data processing, the language overhead of either approach is a rounding error compared to the costs of actually generating and processing the data. The true power of iterating over sequences lazily is in saving memory. Plus, you can feed generators as input to other generators, creating long, data-driven pipelines, with sequence items pulled and processed as needed.

Lazy data pipelines are like Inception, except things don’t get automatically faster by going deeper.

The iteration pattern is also extremely handy (necessary?) when you don’t know how much data you’ll have in advance, and can’t wait for all of it to arrive before you start processing it. Imagine a simulator producing gigabytes of data per second. Clearly we can’t put everything neatly into a Python list first and then start munching — we must process the information as it comes in. Python’s built-in iteration support to the rescue!

Generators, iterators, iterables

This generators vs. iterables vs. iterators business can be a bit confusing: iterator is the stuff we ultimately care about, an object that manages a single pass over a sequence. Both iterables and generators produce an iterator, allowing us to do “for record in iterable_or_generator: …” without worrying about the nitty gritty of keeping track of where we are in the stream, how to get to the next item, how to stop iterating etc.

The difference between iterables and generators: once you’ve burned through a generator once, you’re done, no more data:

generator = (word + '!' for word in 'baby let me iterate ya'.split())
# The generator object is now created, ready to be iterated over.
# No exclamation marks added yet at this point.

for val in generator: # real processing happens here, during iteration
    print val,
baby! let! me! iterate! ya!

for val in generator:
    print val,
# Nothing printed! No more data, generator stream already exhausted above.

On the other hand, an iterable creates a new iterator every time it’s looped over (technically, every time iterable.__iter__() is called, such as when Python hits a “for” loop):

class BeyonceIterable(object):
    def __iter__(self):
        """
        The iterable interface: return an iterator from __iter__().

        Every generator is an iterator implicitly (but not vice versa!),
        so implementing `__iter__` as a generator is the easiest way
        to create streamed iterables.

        """
        for word in 'baby let me iterate ya'.split():
            yield word + '!'  # uses yield => __iter__ is a generator

iterable = BeyonceIterable()

for val in iterable:  # iterator created here
    print val,
baby! let! me! iterate! ya!

for val in iterable:  # another iterator created here
    print val,
baby! let! me! iterate! ya!

So iterables are more universally useful than generators, because we can go over the sequence more than once. Of course, when your data stream comes from a source that cannot be readily repeated (such as hardware sensors), a single pass via a generator may be your only option.

Give a man a fish and you feed him for a day…

People familiar with functional programming are probably shuffling their feet impatiently. Let’s move on to a more practical example: feed documents into the gensim topic modelling software, in a way that doesn’t require you to load the entire text corpus into memory:

def iter_documents(top_directory):
    """
    Generator: iterate over all relevant documents, yielding one
    document (=list of utf8 tokens) at a time.
    """
    # find all .txt documents, no matter how deep under top_directory
    for root, dirs, files in os.walk(top_directory):
        for fname in filter(lambda fname: fname.endswith('.txt'), files):
            # read each document as one big string
            document = open(os.path.join(root, fname)).read()
            # break document into utf8 tokens
            yield gensim.utils.tokenize(document, lower=True, errors='ignore')

class TxtSubdirsCorpus(object):
    """
    Iterable: on each iteration, return bag-of-words vectors,
    one vector for each document.

    Process one document at a time using generators, never
    load the entire corpus into RAM.

    """
    def __init__(self, top_dir):
        self.top_dir = top_dir
        # create dictionary = mapping for documents => sparse vectors
        self.dictionary = gensim.corpora.Dictionary(iter_documents(top_dir))

    def __iter__(self):
        """
        Again, __iter__ is a generator => TxtSubdirsCorpus is a streamed iterable.
        """
        for tokens in iter_documents(self.top_dir):
            # transform tokens (strings) into a sparse vector, one at a time
            yield self.dictionary.doc2bow(tokens)

# that's it! the streamed corpus of sparse vectors is ready
corpus = TxtSubdirsCorpus('/home/radim/corpora/')

# print the corpus vectors
for vector in corpus:
    print vector

# or run truncated Singular Value Decomposition (SVD) on the streamed corpus
from gensim.models.lsimodel import stochastic_svd as svd
u, s = svd(corpus, rank=200, num_terms=len(corpus.dictionary), chunksize=5000)

Some algorithms work better when they can process larger chunks of data (such as 5,000 records) at once, instead of going record-by-record. With a streamed API, mini-batches are trivial: pass around streams and let each algorithm decide how large chunks it needs, grouping records internally. In the example above, I gave a hint to the stochastic SVD algo with chunksize=5000 to process its input stream in groups of 5,000 vectors. With more RAM available, or with shorter documents, I could have told the online SVD algorithm to progress in mini-batches of 1 million documents at a time.

…teach a man to fish and you feed him for a lifetime

The corpus above looks for .txt files under a given directory, treating each file as one document. What if you didn’t know this implementation but wanted to find all .rst files instead? Or search only inside a single dir, instead of all nested subdirs? Treat each file line as an individual document?

One option would be to expect gensim to introduce classes like RstSubdirsCorpus and TxtLinesCorpus and TxtLinesSubdirsCorpus, possibly abstracting the combinations of choices with a special API and optional parameters.

That’s what I call “API bondage” (I may blog about that later!).

Hiding implementations and creating abstractions—with fancy method names to remember—for things that can be achieved with a few lines of code, using concise, native, universal syntax is bad.

The Java world especially seems prone to API bondage.

In gensim, it’s up to you how you create the corpus. Gensim algorithms only care that you supply them with an iterable of sparse vectors (and for some algorithms, even a generator = a single pass over the vectors is enough). You don’t have to use gensim’s Dictionary class to create the sparse vectors. You don’t even have to use streams — a plain Python list is an iterable too! So screw lazy evaluation, load everything into RAM as a list if you like. Or a NumPy matrix.

The streaming corpus example above is a dozen lines of code. I find that ousting small, niche I/O format classes like these into user space is an acceptable price for keeping the library itself lean and flexible.

I’m hoping people realize how straightforward and joyful data processing in Python is, even in presence of more advanced concepts like lazy processing. Use built-in tools and interfaces where possible, say no to API bondage!

Note from Radim: Get my latest machine learning tips & articles delivered straight to your inbox (it's free).

 Unsubscribe anytime, no spamming. Max 2 posts per month, if lucky.

Comments 18

  1. MrProgrammer

    “data streaming (aka lazy evaluation),”

    Data streaming and lazy evaluation are not the same thing.

    1. Post
      Author
  2. Pingback: Articles for 2014-apr-4 | Readings for a day

  3. Lydia

    What’s up with the bunny in bondage. Was that supposed to be funny. You’re a fucking bastard and I hope it all comes back to bite you in the ass. Die a long slow painful death. Fuck you for that disgusting image.

    1. Post
      Author
  4. Aman

    Hi Radim,

    I’m a little confused at line 26 in TxtSubdirsCorpus class, Does gensim.corpora.Dictionary() method implements a for loop to iterate over the generator returned by iter_documents() function?
    Also, at line 32 in the same class, iter_documents() return a tokenized document(a list), so, “for tokens in iter_documents()” essentially iterates over all the tokens in the returned document, or for is just an iterator for iter_documents generator?
    Thanks for the tutorial.

  5. Post
    Author
  6. Fritz Venter

    You may want to consider a ‘with’ statement as follows:

    with open(os.path.join(root, fname)) as document:
    # break document into utf8 tokens
    yield gensim.utils.tokenize(document.read(), lower=True, errors=’ignore’)

    This will ensure that the file is closed even when an exception occurs. See: Example 2 at the end of https://www.python.org/dev/peps/pep-0343/

  7. Rob Wilson

    Do you have a code example of a python api that streams data from a database and into the response?

  8. Janmajay Singh

    Although this post is really old, I hope I get a reply.
    You say that each time the interpreter hits a for loop, iterable.__iter__() is implicitly called and it results in a new iterator object. Each iterator is a generator.
    This is also explained the reason why we can iterate over the sequence more than once.
    My question is:
    If I do an id(iterable.__iter__()) inside each for loop, it returns the same memory address. Wouldn’t that mean that it is the same object? Can you please explain?

  9. Pingback: Python Resources: Getting Started to Going Full Stack – build2learn

  10. Maryam

    hi there,
    thank you for the tutorial,
    in fact, I wanna to apply google pre trained word2vec through this codes:

    “model = gensim.models.KeyedVectors.load_word2vec_format(‘./GoogleNews-vectors-negative300.bin’, binary=True) # load the whole embedding into memory using word2vec
    model.save_word2vec_format(‘./GoogleNews-vectors-negative300.txt’, binary=true)
    embeddings_index = dict()
    f = open(‘GoogleNews-vectors-negative300.bin’)
    but gave me memory error
    how can i deal with this error ??
    any guidance will be appreciated.

Leave a Reply to Fritz Venter Cancel reply

Your email address will not be published. Required fields are marked *