Welcome to Flupy¶
flupy is a lightweight library and CLI for implementing python data pipelines with a fluent interface.
Under the hood, flupy is built on generators. That means its pipelines evaluate lazily and use a constant amount of memory no matter how much data are being processed. This allows flupy to tackle Petabyte scale data manipulation as easily as it operates on a small list.
API¶
import json
from flupy import flu
logs = open('logs.jl', 'r')
error_count = (
flu(logs)
.map(lambda x: json.loads(x))
.filter(lambda x: x['level'] == 'ERROR')
.count()
)
print(error_count)
# 14
CLI¶
The flupy library, and python runtime, are also accessible from flu command line utility:
$ cat logs.txt | flu "_.filter(lambda x: x.startswith('ERROR'))"
For more information about the flu command see command line.
Example¶
Since 2008, what domains are our customers comming from?:
from flupy import flu
customers = [
{'name': 'Jane', 'signup_year': 2018, 'email': 'jane@ibm.com'},
{'name': 'Fred', 'signup_year': 2011, 'email': 'fred@google.com'},
{'name': 'Lisa', 'signup_year': 2014, 'email': 'jane@ibm.com'},
{'name': 'Jack', 'signup_year': 2007, 'email': 'jane@apple.com'},
]
pipeline = (
flu(customers)
.filter(lambda x: x['signup_year'] > 2008)
.map_item('email')
.map(lambda x: x.partition('@')[2])
.group_by() # defaults to identity
.map(lambda x: (x[0], x[1].count()))
.collect()
)
print(pipeline)
# [('google.com', 1), ('ibm.com', 2)]
Contents¶
Welcome to Flupy¶
flupy is a lightweight library and CLI for implementing python data pipelines with a fluent interface.
Under the hood, flupy is built on generators. That means its pipelines evaluate lazily and use a constant amount of memory no matter how much data are being processed. This allows flupy to tackle Petabyte scale data manipulation as easily as it operates on a small list.
API¶
import json
from flupy import flu
logs = open('logs.jl', 'r')
error_count = (
flu(logs)
.map(lambda x: json.loads(x))
.filter(lambda x: x['level'] == 'ERROR')
.count()
)
print(error_count)
# 14
CLI¶
The flupy library, and python runtime, are also accessible from flu command line utility:
$ cat logs.txt | flu "_.filter(lambda x: x.startswith('ERROR'))"
For more information about the flu command see command line.
Example¶
Since 2008, what domains are our customers comming from?:
from flupy import flu
customers = [
{'name': 'Jane', 'signup_year': 2018, 'email': 'jane@ibm.com'},
{'name': 'Fred', 'signup_year': 2011, 'email': 'fred@google.com'},
{'name': 'Lisa', 'signup_year': 2014, 'email': 'jane@ibm.com'},
{'name': 'Jack', 'signup_year': 2007, 'email': 'jane@apple.com'},
]
pipeline = (
flu(customers)
.filter(lambda x: x['signup_year'] > 2008)
.map_item('email')
.map(lambda x: x.partition('@')[2])
.group_by() # defaults to identity
.map(lambda x: (x[0], x[1].count()))
.collect()
)
print(pipeline)
# [('google.com', 1), ('ibm.com', 2)]
API Reference¶
Container¶
Grouping¶
-
flu.
chunk
(n: int) → flupy.fluent.Fluent[typing.List[~T]][List[T]]¶ Yield lists of elements from iterable in groups of n
if the iterable is not evenly divisiible by n, the final list will be shorter
>>> flu(range(10)).chunk(3).to_list() [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
-
flu.
flatten
(depth: int = 1, base_type: Type[object] = None, iterate_strings: bool = False) → flupy.fluent.Fluent[typing.Any][Any]¶ Recursively flatten nested iterables (e.g., a list of lists of tuples) into non-iterable type or an optional user-defined base_type
Strings are treated as non-iterable for convenience. set iterate_string=True to change that behavior.
>>> flu([[0, 1, 2], [3, 4, 5]]).flatten().to_list() [0, 1, 2, 3, 4, 5]
>>> flu([[0, [1, 2]], [[3, 4], 5]]).flatten().to_list() [0, [1, 2], [3, 4], 5]
>>> flu([[0, [1, 2]], [[3, 4], 5]]).flatten(depth=2).to_list() [0, 1, 2, 3, 4, 5]
>>> flu([[0, [1, 2]], [[3, 4], 5]]).flatten(depth=2).to_list() [0, 1, 2, 3, 4, 5]
>>> flu([1, (2, 2), 4, [5, (6, 6, 6)]]).flatten(base_type=tuple).to_list() [1, (2, 2), 4, 5, (6, 6, 6)]
>>> flu([[2, 0], 'abc', 3, [4]]).flatten(iterate_strings=True).to_list() [2, 0, 'a', 'b', 'c', 3, 4]
-
flu.
denormalize
(iterate_strings: bool = False) → flupy.fluent.Fluent[typing.Tuple[typing.Any, ...]][Tuple[Any, ...]]¶ Denormalize iterable components of each record
>>> flu([("abc", [1, 2, 3])]).denormalize().to_list() [('abc', 1), ('abc', 2), ('abc', 3)]
>>> flu([("abc", [1, 2])]).denormalize(iterate_strings=True).to_list() [('a', 1), ('a', 2), ('b', 1), ('b', 2), ('c', 1), ('c', 2)]
>>> flu([("abc", [])]).denormalize().to_list() []
-
flu.
group_by
(key: Callable[[T], Union[T, _T1]] = <function identity>, sort: bool = True) → flupy.fluent.Fluent[typing.Tuple[typing.Union[~T, ~_T1], flupy.fluent.Fluent[~T]]][Tuple[Union[T, _T1], flupy.fluent.Fluent[~T][T]]]¶ Yield consecutive keys and groups from the iterable
key is a function to compute a key value used in grouping and sorting for each element. key defaults to an identity function which returns the unchaged element
When the iterable is pre-sorted according to key, setting sort to False will prevent loading the dataset into memory and improve performance
>>> flu([2, 4, 2, 4]).group_by().to_list() [2, <flu object>), (4, <flu object>)]
Or, if the iterable is pre-sorted
>>> flu([2, 2, 5, 5]).group_by(sort=False).to_list() [(2, <flu object>), (5, <flu object>)]
Using a key function
>>> points = [ {'x': 1, 'y': 0}, {'x': 4, 'y': 3}, {'x': 1, 'y': 5} ] >>> key_func = lambda u: u['x'] >>> flu(points).group_by(key=key_func, sort=True).to_list() [(1, <flu object>), (4, <flu object>)]
-
flu.
window
(n: int, step: int = 1, fill_value: Any = None) → flupy.fluent.Fluent[typing.Tuple[typing.Any, ...]][Tuple[Any, ...]]¶ Yield a sliding window of width n over the given iterable.
Each window will advance in increments of step:
If the length of the iterable does not evenly divide by the step the final output is padded with fill_value
>>> flu(range(5)).window(3).to_list() [(0, 1, 2), (1, 2, 3), (2, 3, 4)]
>>> flu(range(5)).window(n=3, step=2).to_list() [(0, 1, 2), (2, 3, 4)]
>>> flu(range(9)).window(n=4, step=3).to_list() [(0, 1, 2, 3), (3, 4, 5, 6), (6, 7, 8, None)]
>>> flu(range(9)).window(n=4, step=3, fill_value=-1).to_list() [(0, 1, 2, 3), (3, 4, 5, 6), (6, 7, 8, -1)]
Selecting¶
-
flu.
filter
(func: Callable[[...], bool], *args, **kwargs) → flupy.fluent.Fluent[~T][T]¶ Yield elements of iterable where func returns truthy
>>> flu(range(10)).filter(lambda x: x % 2 == 0).to_list() [0, 2, 4, 6, 8]
-
flu.
take
(n: Optional[int] = None) → flupy.fluent.Fluent[~T][T]¶ Yield first n items of the iterable
>>> flu(range(10)).take(2).to_list() [0, 1]
-
flu.
take_while
(predicate: Callable[[T], bool]) → flupy.fluent.Fluent[~T][T]¶ Yield elements from the chainable so long as the predicate is true
>>> flu(range(10)).take_while(lambda x: x < 3).to_list() [0, 1, 2]
-
flu.
drop_while
(predicate: Callable[[T], bool]) → flupy.fluent.Fluent[~T][T]¶ Drop elements from the chainable as long as the predicate is true; afterwards, return every element
>>> flu(range(10)).drop_while(lambda x: x < 3).to_list() [3, 4, 5, 6, 7, 8, 9]
-
flu.
unique
(key: Callable[[T], Hashable] = <function identity>) → flupy.fluent.Fluent[~T][T]¶ Yield elements that are unique by a key.
>>> flu([2, 3, 2, 3]).unique().to_list() [2, 3]
>>> flu([2, -3, -2, 3]).unique(key=abs).to_list() [2, -3]
Transforming¶
-
flu.
enumerate
(start: int = 0) → flupy.fluent.Fluent[typing.Tuple[int, ~T]][Tuple[int, T]]¶ Yields tuples from the instance where the first element is a count from initial value start.
>>> flu([3,4,5]).enumerate().to_list() [(0, 3), (1, 4), (2, 5)]
-
flu.
join_left
(other: Iterable[_T1], key: Callable[[T], Hashable] = <function identity>, other_key: Callable[[_T1], Hashable] = <function identity>) → flupy.fluent.Fluent[typing.Tuple[~T, typing.Union[~_T1, NoneType]]][Tuple[T, Optional[_T1]]]¶ Join the iterable with another iterable using equality between key applied to self and other_key applied to other to identify matching entries
When no matching entry is found in other, entries in the iterable are paired with None
Note: join_left loads other into memory
>>> flu(range(6)).join_left(range(0, 6, 2)).to_list() [(0, 0), (1, None), (2, 2), (3, None), (4, 4), (5, None)]
-
flu.
join_inner
(other: Iterable[_T1], key: Callable[[T], Hashable] = <function identity>, other_key: Callable[[_T1], Hashable] = <function identity>) → flupy.fluent.Fluent[typing.Tuple[~T, ~_T1]][Tuple[T, _T1]]¶ Join the iterable with another iterable using equality between key applied to self and other_key applied to other to identify matching entries
When no matching entry is found in other, entries in the iterable are filtered from the results
Note: join_inner loads other into memory
>>> flu(range(6)).join_inner(range(0, 6, 2)).to_list() [(0, 0), (2, 2), (4, 4)]
-
flu.
map
(func: Callable[[T], _T1], *args, **kwargs) → flupy.fluent.Fluent[~_T1][_T1]¶ Apply func to each element of iterable
>>> flu(range(5)).map(lambda x: x*x).to_list() [0, 1, 4, 9, 16]
-
flu.
map_attr
(attr: str) → flupy.fluent.Fluent[typing.Any][Any]¶ Extracts the attribute attr from each element of the iterable
>>> from collections import namedtuple >>> MyTup = namedtuple('MyTup', ['value', 'backup_val']) >>> flu([MyTup(1, 5), MyTup(2, 4)]).map_attr('value').to_list() [1, 2]
-
flu.
map_item
(item: Hashable) → flupy.fluent.Fluent[flupy.fluent.SupportsGetItem[~T]][flupy.fluent.SupportsGetItem[~T][T]]¶ Extracts item from every element of the iterable
>>> flu([(2, 4), (2, 5)]).map_item(1).to_list() [4, 5]
>>> flu([{'mykey': 8}, {'mykey': 5}]).map_item('mykey').to_list() [8, 5]
-
flu.
zip
(*iterable) → Union[flupy.fluent.Fluent[typing.Tuple[~T, ...]][Tuple[T, ...]], flupy.fluent.Fluent[typing.Tuple[~T, ~_T1]][Tuple[T, _T1]], flupy.fluent.Fluent[typing.Tuple[~T, ~_T1, ~_T2]][Tuple[T, _T1, _T2]], flupy.fluent.Fluent[typing.Tuple[~T, ~_T1, ~_T2, ~_T3]][Tuple[T, _T1, _T2, _T3]]]¶ Yields tuples containing the i-th element from the i-th argument in the instance, and the iterable
>>> flu(range(5)).zip(range(3, 0, -1)).to_list() [(0, 3), (1, 2), (2, 1)]
-
flu.
zip_longest
(*iterable, fill_value: Any = None) → flupy.fluent.Fluent[typing.Tuple[~T, ...]][Tuple[T, ...]]¶ Yields tuples containing the i-th element from the i-th argument in the instance, and the iterable Iteration continues until the longest iterable is exhaused. If iterables are uneven in length, missing values are filled in with fill value
>>> flu(range(5)).zip_longest(range(3, 0, -1)).to_list() [(0, 3), (1, 2), (2, 1), (3, None), (4, None)]
>>> flu(range(5)).zip_longest(range(3, 0, -1), fill_value='a').to_list() [(0, 3), (1, 2), (2, 1), (3, 'a'), (4, 'a')]
Side Effects¶
-
flu.
rate_limit
(per_second: Union[int, float] = 100) → flupy.fluent.Fluent[~T][T]¶ Restrict consumption of iterable to n item per_second
>>> import time >>> start_time = time.time() >>> _ = flu(range(3)).rate_limit(3).to_list() >>> print('Runtime', int(time.time() - start_time)) 1.00126 # approximately 1 second for 3 items
-
flu.
side_effect
(func: Callable[[T], Any], before: Optional[Callable[[], Any]] = None, after: Optional[Callable[[], Any]] = None) → flupy.fluent.Fluent[~T][T]¶ Invoke func for each item in the iterable before yielding the item. func takes a single argument and the output is discarded before and after are optional functions that take no parameters and are executed once before iteration begins and after iteration ends respectively. Each will be called exactly once.
>>> flu(range(2)).side_effect(lambda x: print(f'Collected {x}')).to_list() Collected 0 Collected 1 [0, 1]
Summarizing¶
-
flu.
count
() → int¶ Count of elements in the iterable
>>> flu(['a','b','c']).count() 3
-
flu.
sum
() → Union[T, int]¶ Sum of elements in the iterable
>>> flu([1,2,3]).sum() 6
-
flu.
min
() → SupportsLessThanT¶ Smallest element in the interable
>>> flu([1, 3, 0, 2]).min() 0
-
flu.
max
() → SupportsLessThanT¶ Largest element in the interable
>>> flu([0, 3, 2, 1]).max() 3
-
flu.
reduce
(func: Callable[[T, T], T]) → T¶ Apply a function of two arguments cumulatively to the items of the iterable, from left to right, so as to reduce the sequence to a single value
>>> flu(range(5)).reduce(lambda x, y: x + y) 10
-
flu.
fold_left
(func: Callable[[S, T], S], initial: S) → S¶ Apply a function of two arguments cumulatively to the items of the iterable, from left to right, starting with initial, so as to fold the sequence to a single value
>>> flu(range(5)).fold_left(lambda x, y: x + str(y), "") '01234'
-
flu.
first
(default: Any = <flupy.fluent.Empty object>) → T¶ Return the first item of the iterable. Raise IndexError if empty or default if provided.
>>> flu([0, 1, 2, 3]).first() 0 >>> flu([]).first(default="some_default") 'some_default'
-
flu.
last
(default: Any = <flupy.fluent.Empty object>) → T¶ Return the last item of the iterble. Raise IndexError if empty or default if provided.
>>> flu([0, 1, 2, 3]).last() 3 >>> flu([]).last(default='some_default') 'some_default'
-
flu.
head
(n: int = 10, container_type: Callable[[Iterable[T]], Collection[T]] = <class 'list'>) → Collection[T]¶ Returns up to the first n elements from the iterable.
>>> flu(range(20)).head() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> flu(range(15)).head(n=2) [0, 1]
>>> flu([]).head() []
-
flu.
tail
(n: int = 10, container_type: Callable[[Iterable[T]], Collection[T]] = <class 'list'>) → Collection[T]¶ Return up to the last n elements from the iterable
>>> flu(range(20)).tail() [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
>>> flu(range(15)).tail(n=2) [13, 14]
-
flu.
to_list
() → List[T]¶ Collect items from iterable into a list
>>> flu(range(4)).to_list() [0, 1, 2, 3]
-
flu.
collect
(n: int = None, container_type: Callable[[Iterable[T]], Collection[T]] = <class 'list'>) → Collection[T]¶ Collect items from iterable into a container
>>> flu(range(4)).collect() [0, 1, 2, 3]
>>> flu(range(4)).collect(container_type=set) {0, 1, 2, 3}
>>> flu(range(4)).collect(n=2) [0, 1]
Non-Constant Memory¶
-
flu.
group_by
(key: Callable[[T], Union[T, _T1]] = <function identity>, sort: bool = True) → flupy.fluent.Fluent[typing.Tuple[typing.Union[~T, ~_T1], flupy.fluent.Fluent[~T]]][Tuple[Union[T, _T1], flupy.fluent.Fluent[~T][T]]] Yield consecutive keys and groups from the iterable
key is a function to compute a key value used in grouping and sorting for each element. key defaults to an identity function which returns the unchaged element
When the iterable is pre-sorted according to key, setting sort to False will prevent loading the dataset into memory and improve performance
>>> flu([2, 4, 2, 4]).group_by().to_list() [2, <flu object>), (4, <flu object>)]
Or, if the iterable is pre-sorted
>>> flu([2, 2, 5, 5]).group_by(sort=False).to_list() [(2, <flu object>), (5, <flu object>)]
Using a key function
>>> points = [ {'x': 1, 'y': 0}, {'x': 4, 'y': 3}, {'x': 1, 'y': 5} ] >>> key_func = lambda u: u['x'] >>> flu(points).group_by(key=key_func, sort=True).to_list() [(1, <flu object>), (4, <flu object>)]
-
flu.
join_left
(other: Iterable[_T1], key: Callable[[T], Hashable] = <function identity>, other_key: Callable[[_T1], Hashable] = <function identity>) → flupy.fluent.Fluent[typing.Tuple[~T, typing.Union[~_T1, NoneType]]][Tuple[T, Optional[_T1]]] Join the iterable with another iterable using equality between key applied to self and other_key applied to other to identify matching entries
When no matching entry is found in other, entries in the iterable are paired with None
Note: join_left loads other into memory
>>> flu(range(6)).join_left(range(0, 6, 2)).to_list() [(0, 0), (1, None), (2, 2), (3, None), (4, 4), (5, None)]
-
flu.
join_inner
(other: Iterable[_T1], key: Callable[[T], Hashable] = <function identity>, other_key: Callable[[_T1], Hashable] = <function identity>) → flupy.fluent.Fluent[typing.Tuple[~T, ~_T1]][Tuple[T, _T1]] Join the iterable with another iterable using equality between key applied to self and other_key applied to other to identify matching entries
When no matching entry is found in other, entries in the iterable are filtered from the results
Note: join_inner loads other into memory
>>> flu(range(6)).join_inner(range(0, 6, 2)).to_list() [(0, 0), (2, 2), (4, 4)]
-
flu.
shuffle
() → flupy.fluent.Fluent[~T][T]¶ Randomize the order of elements in the interable
Note: shuffle loads the entire iterable into memory
>>> flu([3,6,1]).shuffle().to_list() [6, 1, 3]
-
flu.
sort
(key: Optional[Callable[[Any], Any]] = None, reverse: bool = False) → flupy.fluent.Fluent[~SupportsLessThanT][SupportsLessThanT]¶ Sort iterable by key function if provided or identity otherwise
Note: sorting loads the entire iterable into memory
>>> flu([3,6,1]).sort().to_list() [1, 3, 6]
>>> flu([3,6,1]).sort(reverse=True).to_list() [6, 3, 1]
>>> flu([3,-6,1]).sort(key=abs).to_list() [1, 3, -6]
-
flu.
tee
(n: int = 2) → flupy.fluent.Fluent[flupy.fluent.Fluent[~T]][flupy.fluent.Fluent[~T][T]]¶ Return n independent iterators from a single iterable
once tee() has made a split, the original iterable should not be used anywhere else; otherwise, the iterable could get advanced without the tee objects being informed
>>> copy1, copy2 = flu(range(5)).tee() >>> copy1.sum() 10 >>> copy2.to_list() [0, 1, 2, 3, 4]
-
flu.
unique
(key: Callable[[T], Hashable] = <function identity>) → flupy.fluent.Fluent[~T][T] Yield elements that are unique by a key.
>>> flu([2, 3, 2, 3]).unique().to_list() [2, 3]
>>> flu([2, -3, -2, 3]).unique(key=abs).to_list() [2, -3]
Command Line¶
The flupy CLI is a platform agnostic application that give full access to the flupy API and python from your shell.
Usage¶
$ flu -h
usage: flu [-h] [-v] [-f FILE] [-i [IMPORT [IMPORT ...]]] command
flupy: a fluent interface for python
positional arguments:
command command to execute against input
optional arguments:
-h, --help show this help message and exit
-v, --version show program's version number and exit
-f FILE, --file FILE path to input file
-i [IMPORT [IMPORT ...]], --import [IMPORT [IMPORT ...]]
modules to import
Syntax: <module>:<object>:<alias>
Basic Examples¶
When input data are provided to the flu command, an instance of the flu object is preprepared with that input and stored in the the variable _.
Note
for more information on writing flupy commands, see API Reference
Piping from another command (stdin)¶
Example: Show lines of a log file that are errors:
$ cat logs.txt | flu '_.filter(lambda x: x.starswith("ERROR"))'
Reading from a file¶
Example: Show lines of a log file that are errors:
$ flu -f logs.txt '_.filter(lambda x: x.starswith("ERROR"))'
No Input data¶
flupy does not require input data if it can be generated from within python e.g. with range(10). When no input data are provided, iterable at the beginning of the flupy command must be wraped into a flu instance.
Example: Even integers less than 10:
$ flu 'flu(range(10)).filter(lambda x: x%2==0)'
Import System¶
Passing -i or –import to the cli allows you to import standard and third party libraries installed in the same environment.
Import syntax
-i <module>:<object>:<alias>
Note
for multiple imports pass -i multiple times
Import Examples¶
import os:
$ flu 'flu(os.environ)' -i os
from os import environ:
$ flu 'flu(environ)' -i os:environ
from os import environ as env:
$ flu 'flu(env)' -i os:environ:env
import os as opsys:
$ flu 'flu(opsys.environ)' -i os::opsys
License¶
flupy is under the MIT License. See the LICENSE file.
Conditions for Contributors¶
By contributing to this software project, you are agreeing to the following terms and conditions for your contributions: First, you agree your contributions are submitted under the MIT license. Second, you represent you are authorized to make the contributions and grant the license. If your employer has rights to intellectual property that includes your contributions, you represent that you have received permission to make contributions and grant the required license on behalf of that employer.
Design Influances¶
Version History¶
1.0.0¶
- New Capabilities:
- Everything
1.1.0¶
- Remove support for calling instance methods on uninitialized flu class passing an interable as the self argument
- Remove flupy.Fluent from top level flupy public API
- Remove flupy.with_iter from API
1.1.2¶
- Change Fluent class name to flu and remove class alias to improve docs readability
- Add type hints for flu.sum