Mapytex/mapytex/calculus/core/str2.py

516 lines
12 KiB
Python

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2017 lafrite <lafrite@Poivre>
#
# Distributed under terms of the MIT license.
"""
Converting a string with coroutines
"""
from functools import partial
from decimal import Decimal
from .coroutine import *
__all__ = ["str2", ]
def maybe_it_is(cara):
""" Return a function which return
Maybe if cara startwith the argument and True if it is cara
:exemple:
>>> it_is_pipo = maybe_it_is("pipo")
>>> it_is_pipo("pi")
'maybe'
>>> it_is_pipo("pipo")
True
>>> it_is_pipo("uo")
False
>>> it_is_iuo = maybe_it_is("iuo")
>>> it_is_iuo("pi")
False
>>> it_is_iuo("iuo")
True
>>> it_is_iuo("uo")
False
"""
def func(c):
if c == cara:
return True
elif cara.startswith(c):
return "maybe"
else:
return False
return func
def something_in(cara_list):
""" Return a function which sais whether a caracter is in cara_list or not
"""
def func(c):
if c in cara_list:
return True
else:
return False
return func
@coroutine
def lookfor(condition, replace = lambda x:''.join(x)):
""" Sink which is looking for term and yield replace founded patern
with the lenght of the accumulation.
It can be reset by sending None
:param condition: conditional function which returns a boolean or "maybe"
:param replace: function which make a final transformation to the founded
string. Id by default.
:example:
>>> lf = lookfor(maybe_it_is("pipo"), lambda x: 'remp')
>>> acc = ""
>>> for i in 'popipo':
... a = lf.send(i)
... if a == "maybe":
... acc += i
... elif a:
... acc = ""
... print(a)
... else:
... print(acc + i)
... acc = ""
po
remp
>>> lf = lookfor(maybe_it_is("po") , lambda x: ''.join(x).upper())
>>> acc = ""
>>> for i in 'popipo':
... a = lf.send(i)
... if a == "maybe":
... acc += i
... elif a:
... acc = ""
... print(a)
... else:
... print(acc + i)
... acc = ""
PO
pi
PO
>>> lf = lookfor(maybe_it_is("pi") , lambda x: 1)
>>> acc = ""
>>> for i in 'popipop':
... a = lf.send(i)
... if a == "maybe":
... acc += i
... elif a:
... acc = ""
... print(a)
... else:
... print(acc + i)
... acc = ""
po
1
po
>>> # the p in still processing
>>> acc = ""
>>> for i in 'iop':
... a = lf.send(i)
... if a == "maybe":
... acc += i
... elif a:
... acc = ""
... print(a)
... else:
... print(acc + i)
... acc = ""
1
o
>>> # the p in still processing
>>> lf.throw(RESTAAART)
False
>>> # p has been forgot
>>> acc = ""
>>> for i in 'ipopi':
... a = lf.send(i)
... if a == "maybe":
... acc += i
... elif a:
... acc = ""
... print(a)
... else:
... print(acc + i)
... acc = ""
i
po
1
"""
acc = []
ans = False
while True:
try:
c = (yield ans)
except RESTAAART as err:
acc = []
ans = False
else:
if c is not None:
acc.append(c)
found = condition(''.join([str(i) for i in acc]))
if found == "maybe":
ans = "maybe"
elif found:
ans = replace(acc)
acc = []
else:
ans = False
acc = []
@coroutine
def remember_lookfor(lookfor):
""" Coroutine which remember sent value before the lookfor finds something
:example:
>>> lkf = lookfor(maybe_it_is("pipo"), lambda x: 'remp')
>>> rmb_lkf = remember_lookfor(lkf)
>>> for i in 'popipopi':
... print(rmb_lkf.send(i))
maybe
False
maybe
maybe
maybe
['p', 'o', 'remp']
maybe
maybe
>>> for i in 'popi':
... print(rmb_lkf.send(i))
maybe
['remp']
maybe
maybe
>>> rmb_lkf.throw(RESTAAART)
['p', 'i']
>>> for i in 'popi':
... print(rmb_lkf.send(i))
maybe
False
maybe
maybe
"""
acc = []
mb_acc = []
ans = False
while True:
try:
c = (yield ans)
except RESTAAART as err:
lookfor.throw(err)
ans = acc + mb_acc
acc = []
mb_acc = []
else:
lkf_state = lookfor.send(c)
if lkf_state == "maybe":
mb_acc.append(c)
ans = "maybe"
elif lkf_state:
ans = acc + [lkf_state]
acc = []
mb_acc = []
else:
acc += mb_acc
mb_acc = []
acc.append(c)
ans = False
@coroutine
def concurent_broadcast(target, lookfors = []):
""" Coroutine which broadcasts multiple lookfor coroutines and reinitializes
them when one found something
Same as parallelized_lookfor but coroutine version
:param target: target to send data to
:param lookfors: list of lookfor coroutines
:example:
>>> lf1 = lookfor(maybe_it_is("abc"), lambda x: "".join(x).upper())
>>> searcher = concurent_broadcast(list_sink, [lf1])
>>> for i in "azabcab":
... searcher.send(i)
>>> a = searcher.throw(STOOOP)
>>> print(a)
['a', 'z', 'ABC', 'a', 'b']
>>> lf2 = lookfor(maybe_it_is("az"))
>>> searcher = concurent_broadcast(list_sink, [lf1, lf2])
>>> for i in "azabcabazb":
... searcher.send(i)
>>> a = searcher.throw(STOOOP)
>>> print(a)
['az', 'ABC', 'a', 'b', 'az', 'b']
>>> lfop = lookfor(something_in("+-*/()"), lambda x: f"op{x}")
>>> searcher = concurent_broadcast(list_sink, [lfop])
>>> for i in '12+3+234':
... searcher.send(i)
>>> a = searcher.throw(STOOOP)
>>> print(a)
['1', '2', "op['+']", '3', "op['+']", '2', '3', '4']
>>> # need to remake a searcher otherwise it remenbers old ans
>>> searcher = concurent_broadcast(list_sink, [lfop])
>>> for i in '12*(3+234)':
... searcher.send(i)
>>> a = searcher.throw(STOOOP)
>>> print(a)
['1', '2', "op['*']", "op['(']", '3', "op['+']", '2', '3', '4', "op[')']"]
>>> lfsqrt = lookfor(maybe_it_is("sqrt"), lambda x: f"op['sqrt']")
>>> searcher = concurent_broadcast(list_sink, [lfop, lfsqrt])
>>> for i in '3+2*sqrt(3)':
... searcher.send(i)
>>> a = searcher.throw(STOOOP)
>>> print(a)
['3', "op['+']", '2', "op['*']", "op['sqrt']", "op['(']", '3', "op[')']"]
"""
try:
target_ = target()
except TypeError:
target_ = target
lookfors_ = [remember_lookfor(lkf) for lkf in lookfors]
stock = []
ans = []
try:
while True:
found = False
tok = yield
if tok is not None:
for lf in lookfors_:
lf_ans = lf.send(tok)
if lf_ans and lf_ans != "maybe":
found = lf_ans
break
if found:
for lf in lookfors_:
lf.throw(RESTAAART)
for i in found:
ans = target_.send(i)
except STOOOP as err:
for lf in lookfors_:
last = lf.throw(RESTAAART)
for i in last:
target_.send(i)
yield target_.throw(err)
@coroutine
def lookforNumbers(target):
""" Coroutine which parse numbers
:exemple:
>>> str2list = lookforNumbers(list_sink)
>>> for i in "12+1234*67":
... str2list.send(i)
>>> a = str2list.throw(STOOOP)
>>> print(a)
[12, '+', 1234, '*', 67]
>>> str2list = lookforNumbers(list_sink)
>>> for i in "1.2+12.34*67":
... str2list.send(i)
>>> a = str2list.throw(STOOOP)
>>> print(a)
[Decimal('1.2'), '+', Decimal('12.34'), '*', 67]
>>> str2list = lookforNumbers(list_sink)
>>> for i in "12.3.4*67":
... str2list.send(i)
Traceback (most recent call last):
...
ValueError: Can't build a number with 2 dots (current is 12.3)
>>> str2list = lookforNumbers(list_sink)
>>> for i in ".34*67":
... a = str2list.send(i)
Traceback (most recent call last):
...
ValueError: Can't build a number starting with a dot
"""
try:
target_ = target()
except TypeError:
target_ = target
current = ""
ans = []
try:
while True:
tok = yield
if tok is not None:
try:
int(tok)
except ValueError:
if tok == '.':
if "." in current:
raise ValueError(f"Can't build a number with 2 dots (current is {current})")
elif len(current) == 0:
raise ValueError(f"Can't build a number starting with a dot")
else:
current += tok
else:
if current:
target_.send(typifiy_numbers(current))
current = ""
target_.send(tok)
else:
current += tok
except STOOOP as err:
if current:
target_.send(typifiy_numbers(current))
yield target_.throw(err)
def typifiy_numbers(number):
""" Transform a str number into a integer or a decimal """
try:
return int(number)
except ValueError:
return Decimal(number)
@coroutine
def pparser(target):
""" Parenthesis parser sink
:example:
>>> pp = pparser(list_sink)
>>> for i in "buio":
... pp.send(i)
>>> a = pp.throw(STOOOP)
>>> a
['b', 'u', 'i', 'o']
>>> pp = pparser(list_sink)
>>> for i in "agg(bcz)e(i(o))":
... pp.send(i)
>>> a = pp.throw(STOOOP)
>>> a
['a', 'g', 'g', ['b', 'c', 'z'], 'e', ['i', ['o']]]
"""
target_ = target()
try:
while True:
caract = yield
if caract == "(":
a = yield from pparser(target)
target_.send(a)
elif caract == ")":
a = target_.throw(STOOOP)
return a
else:
target_.send(caract)
except STOOOP as err:
yield target_.throw(err)
@coroutine
def list_sink():
""" Testing sink for coroutines
:example:
>>> sink = list_sink()
>>> for i in '12+34 * 3':
... sink.send(i)
>>> a = sink.throw(STOOOP)
>>> a
['1', '2', '+', '3', '4', ' ', '*', ' ', '3']
"""
ans = list()
try:
while True:
c = (yield)
if c is not None:
ans.append(c)
except STOOOP:
yield ans
def str2(sink):
""" Return a pipeline which parse an expression with the sink as an endpont
:example:
>>> str2nestedlist = str2(list_sink)
>>> exp = "12+3*4"
>>> t = str2nestedlist(exp)
>>> print(t)
[12, '+', 3, '*', 4]
>>> exp = "12*3+4"
>>> t = str2nestedlist(exp)
>>> print(t)
[12, '*', 3, '+', 4]
>>> exp = "12*(3+4)"
>>> t = str2nestedlist(exp)
>>> print(t)
[12, '*', [3, '+', 4]]
>>> from .tree import MutableTree
>>> str2tree = str2(MutableTree.sink)
>>> exp = "12+3*4"
>>> t = str2tree(exp)
>>> print(t)
+
> 12
> *
| > 3
| > 4
>>> exp = "12*3+4"
>>> t = str2tree(exp)
>>> print(t)
+
> *
| > 12
| > 3
> 4
>>> exp = "12*(3+4)"
>>> t = str2tree(exp)
>>> print(t)
*
> 12
> +
| > 3
| > 4
"""
lfop = lookfor(something_in("+-*/"))
operator_corout = partial(concurent_broadcast, lookfors = [lfop])
def pipeline(expression):
str2_corout = lookforNumbers(operator_corout(pparser(sink)))
for i in expression:
str2_corout.send(i)
a = str2_corout.throw(STOOOP)
return a
return pipeline
str2nestedlist = str2(list_sink)
# -----------------------------
# Reglages pour 'vim'
# vim:set autoindent expandtab tabstop=4 shiftwidth=4:
# cursor: 16 del