708 lines
17 KiB
Python
708 lines
17 KiB
Python
#! /usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
# vim:fenc=utf-8
|
|
#
|
|
# Copyright © 2017 lafrite <lafrite@Poivre>
|
|
#
|
|
# Distributed under terms of the MIT license.
|
|
|
|
"""
|
|
Converting a string with coroutines
|
|
"""
|
|
|
|
from functools import partial
|
|
from decimal import Decimal
|
|
from .coroutine import *
|
|
from .operator import is_operator
|
|
from .MO import moify
|
|
|
|
__all__ = ["str2", ]
|
|
|
|
class ParsingError(Exception):
|
|
pass
|
|
|
|
|
|
def maybe_it_is(cara):
|
|
""" Return a function which return
|
|
|
|
Maybe if cara startwith the argument and True if it is cara
|
|
|
|
:exemple:
|
|
|
|
>>> it_is_pipo = maybe_it_is("pipo")
|
|
>>> it_is_pipo("pi")
|
|
'maybe'
|
|
>>> it_is_pipo("pipo")
|
|
True
|
|
>>> it_is_pipo("uo")
|
|
False
|
|
>>> it_is_iuo = maybe_it_is("iuo")
|
|
>>> it_is_iuo("pi")
|
|
False
|
|
>>> it_is_iuo("iuo")
|
|
True
|
|
>>> it_is_iuo("uo")
|
|
False
|
|
"""
|
|
def func(c):
|
|
if c == cara:
|
|
return True
|
|
elif cara.startswith(c):
|
|
return "maybe"
|
|
else:
|
|
return False
|
|
return func
|
|
|
|
def something_in(cara_list):
|
|
""" Return a function which sais whether a caracter is in cara_list or not
|
|
"""
|
|
def func(c):
|
|
if c in cara_list:
|
|
return True
|
|
else:
|
|
return False
|
|
return func
|
|
|
|
@coroutine
|
|
def lookfor(condition, replace = lambda x:''.join(x)):
|
|
""" Sink which is looking for term and yield replace founded patern
|
|
with the lenght of the accumulation.
|
|
|
|
It can be reset by sending None
|
|
|
|
:param condition: conditional function which returns a boolean or "maybe"
|
|
:param replace: function which make a final transformation to the founded
|
|
string. Id by default.
|
|
|
|
:example:
|
|
|
|
>>> lf = lookfor(maybe_it_is("pipo"), lambda x: 'remp')
|
|
>>> acc = ""
|
|
>>> for i in 'popipo':
|
|
... a = lf.send(i)
|
|
... if a == "maybe":
|
|
... acc += i
|
|
... elif a:
|
|
... acc = ""
|
|
... print(a)
|
|
... else:
|
|
... print(acc + i)
|
|
... acc = ""
|
|
po
|
|
remp
|
|
>>> lf = lookfor(maybe_it_is("po") , lambda x: ''.join(x).upper())
|
|
>>> acc = ""
|
|
>>> for i in 'popipo':
|
|
... a = lf.send(i)
|
|
... if a == "maybe":
|
|
... acc += i
|
|
... elif a:
|
|
... acc = ""
|
|
... print(a)
|
|
... else:
|
|
... print(acc + i)
|
|
... acc = ""
|
|
PO
|
|
pi
|
|
PO
|
|
>>> lf = lookfor(maybe_it_is("pi") , lambda x: 1)
|
|
>>> acc = ""
|
|
>>> for i in 'popipop':
|
|
... a = lf.send(i)
|
|
... if a == "maybe":
|
|
... acc += i
|
|
... elif a:
|
|
... acc = ""
|
|
... print(a)
|
|
... else:
|
|
... print(acc + i)
|
|
... acc = ""
|
|
po
|
|
1
|
|
po
|
|
>>> # the p in still processing
|
|
>>> acc = ""
|
|
>>> for i in 'iop':
|
|
... a = lf.send(i)
|
|
... if a == "maybe":
|
|
... acc += i
|
|
... elif a:
|
|
... acc = ""
|
|
... print(a)
|
|
... else:
|
|
... print(acc + i)
|
|
... acc = ""
|
|
1
|
|
o
|
|
>>> # the p in still processing
|
|
>>> lf.throw(RESTAAART)
|
|
False
|
|
>>> # p has been forgot
|
|
>>> acc = ""
|
|
>>> for i in 'ipopi':
|
|
... a = lf.send(i)
|
|
... if a == "maybe":
|
|
... acc += i
|
|
... elif a:
|
|
... acc = ""
|
|
... print(a)
|
|
... else:
|
|
... print(acc + i)
|
|
... acc = ""
|
|
i
|
|
po
|
|
1
|
|
|
|
"""
|
|
acc = []
|
|
ans = False
|
|
while True:
|
|
try:
|
|
c = (yield ans)
|
|
except RESTAAART as err:
|
|
acc = []
|
|
ans = False
|
|
else:
|
|
if c is not None:
|
|
acc.append(c)
|
|
found = condition(''.join([str(i) for i in acc]))
|
|
if found == "maybe":
|
|
ans = "maybe"
|
|
elif found:
|
|
ans = replace(acc)
|
|
acc = []
|
|
else:
|
|
ans = False
|
|
acc = []
|
|
|
|
@coroutine
|
|
def remember_lookfor(lookfor):
|
|
""" Coroutine which remember sent value before the lookfor finds something
|
|
|
|
:example:
|
|
|
|
>>> lkf = lookfor(maybe_it_is("pipo"), lambda x: 'remp')
|
|
>>> rmb_lkf = remember_lookfor(lkf)
|
|
>>> for i in 'popipopi':
|
|
... print(rmb_lkf.send(i))
|
|
maybe
|
|
False
|
|
maybe
|
|
maybe
|
|
maybe
|
|
['p', 'o', 'remp']
|
|
maybe
|
|
maybe
|
|
>>> for i in 'popi':
|
|
... print(rmb_lkf.send(i))
|
|
maybe
|
|
['remp']
|
|
maybe
|
|
maybe
|
|
>>> rmb_lkf.throw(RESTAAART)
|
|
['p', 'i']
|
|
>>> for i in 'popi':
|
|
... print(rmb_lkf.send(i))
|
|
maybe
|
|
False
|
|
maybe
|
|
maybe
|
|
|
|
"""
|
|
acc = []
|
|
mb_acc = []
|
|
ans = False
|
|
while True:
|
|
try:
|
|
c = (yield ans)
|
|
except RESTAAART as err:
|
|
lookfor.throw(err)
|
|
ans = acc + mb_acc
|
|
acc = []
|
|
mb_acc = []
|
|
else:
|
|
lkf_state = lookfor.send(c)
|
|
if lkf_state == "maybe":
|
|
mb_acc.append(c)
|
|
ans = "maybe"
|
|
elif lkf_state:
|
|
ans = acc + [lkf_state]
|
|
acc = []
|
|
mb_acc = []
|
|
else:
|
|
acc += mb_acc
|
|
mb_acc = []
|
|
acc.append(c)
|
|
ans = False
|
|
|
|
@coroutine
|
|
def concurent_broadcast(target, lookfors = []):
|
|
""" Coroutine which broadcasts multiple lookfor coroutines and reinitializes
|
|
them when one found something
|
|
|
|
Same as parallelized_lookfor but coroutine version
|
|
|
|
:param target: target to send data to
|
|
:param lookfors: list of lookfor coroutines
|
|
|
|
:example:
|
|
>>> lf1 = lookfor(maybe_it_is("abc"), lambda x: "".join(x).upper())
|
|
>>> searcher = concurent_broadcast(list_sink, [lf1])
|
|
>>> for i in "azabcab":
|
|
... searcher.send(i)
|
|
>>> a = searcher.throw(STOOOP)
|
|
>>> print(a)
|
|
['a', 'z', 'ABC', 'a', 'b']
|
|
|
|
>>> lf2 = lookfor(maybe_it_is("az"))
|
|
>>> searcher = concurent_broadcast(list_sink, [lf1, lf2])
|
|
>>> for i in "azabcabazb":
|
|
... searcher.send(i)
|
|
>>> a = searcher.throw(STOOOP)
|
|
>>> print(a)
|
|
['az', 'ABC', 'a', 'b', 'az', 'b']
|
|
|
|
>>> lfop = lookfor(something_in("+-*/()"), lambda x: f"op{x}")
|
|
>>> searcher = concurent_broadcast(list_sink, [lfop])
|
|
>>> for i in '12+3+234':
|
|
... searcher.send(i)
|
|
>>> a = searcher.throw(STOOOP)
|
|
>>> print(a)
|
|
['1', '2', "op['+']", '3', "op['+']", '2', '3', '4']
|
|
|
|
>>> # need to remake a searcher otherwise it remenbers old ans
|
|
>>> searcher = concurent_broadcast(list_sink, [lfop])
|
|
>>> for i in '12*(3+234)':
|
|
... searcher.send(i)
|
|
>>> a = searcher.throw(STOOOP)
|
|
>>> print(a)
|
|
['1', '2', "op['*']", "op['(']", '3', "op['+']", '2', '3', '4', "op[')']"]
|
|
|
|
>>> lfsqrt = lookfor(maybe_it_is("sqrt"), lambda x: f"op['sqrt']")
|
|
>>> searcher = concurent_broadcast(list_sink, [lfop, lfsqrt])
|
|
>>> for i in '3+2*sqrt(3)':
|
|
... searcher.send(i)
|
|
>>> a = searcher.throw(STOOOP)
|
|
>>> print(a)
|
|
['3', "op['+']", '2', "op['*']", "op['sqrt']", "op['(']", '3', "op[')']"]
|
|
"""
|
|
try:
|
|
target_ = target()
|
|
except TypeError:
|
|
target_ = target
|
|
|
|
lookfors_ = [remember_lookfor(lkf) for lkf in lookfors]
|
|
|
|
try:
|
|
while True:
|
|
found = False
|
|
tok = yield
|
|
if tok is not None:
|
|
for lf in lookfors_:
|
|
lf_ans = lf.send(tok)
|
|
if lf_ans and lf_ans != "maybe":
|
|
found = lf_ans
|
|
break
|
|
|
|
if found:
|
|
for lf in lookfors_:
|
|
lf.throw(RESTAAART)
|
|
for i in found:
|
|
target_.send(i)
|
|
except STOOOP as err:
|
|
for lf in lookfors_:
|
|
last = lf.throw(RESTAAART)
|
|
for i in last:
|
|
target_.send(i)
|
|
yield target_.throw(err)
|
|
|
|
@coroutine
|
|
def missing_times(target):
|
|
""" Coroutine which send a "*" when it's missing
|
|
|
|
Cases where a "*" is missing:
|
|
- 2x or yx or )x
|
|
- 2( or y( or )(
|
|
|
|
:param target: target to send data to
|
|
|
|
:example:
|
|
|
|
>>> miss_time = missing_times(list_sink)
|
|
>>> for i in "2a":
|
|
... miss_time.send(i)
|
|
>>> a = miss_time.throw(STOOOP)
|
|
>>> print(a)
|
|
['2', '*', 'a']
|
|
>>> miss_time = missing_times(list_sink)
|
|
>>> for i in "ab":
|
|
... miss_time.send(i)
|
|
>>> a = miss_time.throw(STOOOP)
|
|
>>> print(a)
|
|
['a', '*', 'b']
|
|
>>> miss_time = missing_times(list_sink)
|
|
>>> for i in "a(":
|
|
... miss_time.send(i)
|
|
>>> a = miss_time.throw(STOOOP)
|
|
>>> print(a)
|
|
['a', '*', '(']
|
|
>>> miss_time = missing_times(list_sink)
|
|
>>> for i in "2(":
|
|
... miss_time.send(i)
|
|
>>> a = miss_time.throw(STOOOP)
|
|
>>> print(a)
|
|
['2', '*', '(']
|
|
>>> miss_time = missing_times(list_sink)
|
|
>>> for i in ")(":
|
|
... miss_time.send(i)
|
|
>>> a = miss_time.throw(STOOOP)
|
|
>>> print(a)
|
|
[')', '*', '(']
|
|
|
|
>>> miss_time = missing_times(list_sink)
|
|
>>> for i in "3+4":
|
|
... miss_time.send(i)
|
|
>>> a = miss_time.throw(STOOOP)
|
|
>>> print(a)
|
|
['3', '+', '4']
|
|
"""
|
|
try:
|
|
target_ = target()
|
|
except TypeError:
|
|
target_ = target
|
|
|
|
previous = None
|
|
try:
|
|
while True:
|
|
tok = yield
|
|
if not previous is None:
|
|
previous = None
|
|
|
|
if isinstance(tok, str):
|
|
if tok == '(':
|
|
target_.send("*")
|
|
elif not is_operator(tok) and tok != ')':
|
|
target_.send("*")
|
|
|
|
if isinstance(tok, int) or \
|
|
(isinstance(tok, str) and \
|
|
not is_operator(tok) and \
|
|
not tok == '('):
|
|
previous = tok
|
|
|
|
target_.send(tok)
|
|
|
|
except STOOOP as err:
|
|
yield target_.throw(err)
|
|
|
|
@coroutine
|
|
def lookforNumbers(target):
|
|
""" Coroutine which parse numbers
|
|
|
|
|
|
:exemple:
|
|
>>> str2list = lookforNumbers(list_sink)
|
|
>>> for i in "12+1234*67":
|
|
... str2list.send(i)
|
|
>>> a = str2list.throw(STOOOP)
|
|
>>> print(a)
|
|
[12, '+', 1234, '*', 67]
|
|
|
|
>>> str2list = lookforNumbers(list_sink)
|
|
>>> for i in "1.2+12.34*67":
|
|
... str2list.send(i)
|
|
>>> a = str2list.throw(STOOOP)
|
|
>>> print(a)
|
|
[Decimal('1.2'), '+', Decimal('12.34'), '*', 67]
|
|
|
|
>>> str2list = lookforNumbers(list_sink)
|
|
>>> for i in "12.3.4*67":
|
|
... str2list.send(i)
|
|
Traceback (most recent call last):
|
|
...
|
|
mapytex.calculus.core.str2.ParsingError: Can't build a number with 2 dots (current is 12.3)
|
|
>>> str2list = lookforNumbers(list_sink)
|
|
>>> for i in ".34*67":
|
|
... a = str2list.send(i)
|
|
Traceback (most recent call last):
|
|
...
|
|
mapytex.calculus.core.str2.ParsingError: Can't build a number starting with a dot
|
|
|
|
>>> str2list = lookforNumbers(list_sink)
|
|
>>> for i in "12-34":
|
|
... str2list.send(i)
|
|
>>> a = str2list.throw(STOOOP)
|
|
>>> print(a)
|
|
[12, '+', -34]
|
|
>>> str2list = lookforNumbers(list_sink)
|
|
>>> for i in "-12+34":
|
|
... str2list.send(i)
|
|
>>> a = str2list.throw(STOOOP)
|
|
>>> print(a)
|
|
[-12, '+', 34]
|
|
>>> str2list = lookforNumbers(list_sink)
|
|
>>> for i in "3-1.2":
|
|
... str2list.send(i)
|
|
>>> a = str2list.throw(STOOOP)
|
|
>>> print(a)
|
|
[3, '+', Decimal('-1.2')]
|
|
>>> str2list = lookforNumbers(list_sink)
|
|
>>> for i in "3-(1.2)":
|
|
... str2list.send(i)
|
|
>>> a = str2list.throw(STOOOP)
|
|
>>> print(a)
|
|
[3, '+', '-', '(', Decimal('1.2'), ')']
|
|
|
|
"""
|
|
try:
|
|
target_ = target()
|
|
except TypeError:
|
|
target_ = target
|
|
|
|
current = ""
|
|
try:
|
|
while True:
|
|
tok = yield
|
|
if tok is not None:
|
|
try:
|
|
int(tok)
|
|
except ValueError:
|
|
if tok == '.':
|
|
if "." in current:
|
|
raise ParsingError(f"Can't build a number with 2 dots (current is {current})")
|
|
elif not current:
|
|
raise ParsingError(f"Can't build a number starting with a dot")
|
|
else:
|
|
current += tok
|
|
elif tok == '-':
|
|
if current:
|
|
target_.send(typifiy_numbers(current))
|
|
target_.send('+')
|
|
current = tok
|
|
else:
|
|
if current == '-':
|
|
target_.send(current)
|
|
current = ""
|
|
elif current:
|
|
target_.send(typifiy_numbers(current))
|
|
current = ""
|
|
target_.send(tok)
|
|
else:
|
|
current += tok
|
|
|
|
except STOOOP as err:
|
|
if current:
|
|
target_.send(typifiy_numbers(current))
|
|
yield target_.throw(err)
|
|
|
|
def typifiy_numbers(number):
|
|
""" Transform a str number into a integer or a decimal """
|
|
try:
|
|
return int(number)
|
|
except ValueError:
|
|
return Decimal(number)
|
|
|
|
@coroutine
|
|
def pparser(target):
|
|
""" Parenthesis parser sink
|
|
|
|
:example:
|
|
|
|
>>> pp = pparser(list_sink)
|
|
>>> for i in "buio":
|
|
... pp.send(i)
|
|
>>> a = pp.throw(STOOOP)
|
|
>>> a
|
|
['b', 'u', 'i', 'o']
|
|
>>> pp = pparser(list_sink)
|
|
>>> for i in "agg(bcz)e(i(o))":
|
|
... pp.send(i)
|
|
>>> a = pp.throw(STOOOP)
|
|
>>> a
|
|
['a', 'g', 'g', ['b', 'c', 'z'], 'e', ['i', ['o']]]
|
|
|
|
"""
|
|
target_ = target()
|
|
|
|
try:
|
|
while True:
|
|
caract = yield
|
|
if caract == "(":
|
|
a = yield from pparser(target)
|
|
target_.send(a)
|
|
elif caract == ")":
|
|
a = target_.throw(STOOOP)
|
|
return a
|
|
else:
|
|
target_.send(caract)
|
|
except STOOOP as err:
|
|
yield target_.throw(err)
|
|
|
|
@coroutine
|
|
def list_sink():
|
|
""" Testing sink for coroutines
|
|
|
|
:example:
|
|
|
|
>>> sink = list_sink()
|
|
>>> for i in '12+34 * 3':
|
|
... sink.send(i)
|
|
>>> a = sink.throw(STOOOP)
|
|
>>> a
|
|
['1', '2', '+', '3', '4', ' ', '*', ' ', '3']
|
|
|
|
"""
|
|
ans = list()
|
|
try:
|
|
while True:
|
|
c = (yield)
|
|
if c is not None:
|
|
ans.append(c)
|
|
except STOOOP:
|
|
yield ans
|
|
|
|
def str2(sink, convert_to_mo=True):
|
|
""" Return a pipeline which parse an expression with the sink as an endpont
|
|
|
|
:example:
|
|
|
|
>>> str2nestedlist = str2(list_sink)
|
|
>>> exp = "12+3*4"
|
|
>>> t = str2nestedlist(exp)
|
|
>>> print(t)
|
|
[<MOnumber 12>, '+', <MOnumber 3>, '*', <MOnumber 4>]
|
|
>>> str2nestedlist = str2(list_sink, False)
|
|
>>> exp = "12+3*4"
|
|
>>> t = str2nestedlist(exp)
|
|
>>> print(t)
|
|
[12, '+', 3, '*', 4]
|
|
>>> exp = "12*3+4"
|
|
>>> t = str2nestedlist(exp)
|
|
>>> print(t)
|
|
[12, '*', 3, '+', 4]
|
|
>>> exp = "12*(3+4)"
|
|
>>> t = str2nestedlist(exp)
|
|
>>> print(t)
|
|
[12, '*', [3, '+', 4]]
|
|
>>> exp = "12(3+4)"
|
|
>>> t = str2nestedlist(exp)
|
|
>>> print(t)
|
|
[12, '*', [3, '+', 4]]
|
|
>>> exp = "2a(3+4)"
|
|
>>> t = str2nestedlist(exp)
|
|
>>> print(t)
|
|
[2, '*', 'a', '*', [3, '+', 4]]
|
|
>>> exp = "-12(3-4)"
|
|
>>> t = str2nestedlist(exp)
|
|
>>> print(t)
|
|
[-12, '*', [3, '+', -4]]
|
|
>>> exp = "-a(3-4)"
|
|
>>> t = str2nestedlist(exp)
|
|
>>> print(t)
|
|
['-', 'a', '*', [3, '+', -4]]
|
|
>>> exp = "3-a"
|
|
>>> t = str2nestedlist(exp)
|
|
>>> print(t)
|
|
[3, '+', '-', 'a']
|
|
>>> exp = "1-(3+4)"
|
|
>>> t = str2nestedlist(exp)
|
|
>>> print(t)
|
|
[1, '+', '-', [3, '+', 4]]
|
|
|
|
>>> from .tree import MutableTree
|
|
>>> str2tree = str2(MutableTree.sink)
|
|
>>> exp = "12+3*4"
|
|
>>> t = str2tree(exp)
|
|
>>> print(t)
|
|
+
|
|
> 12
|
|
> *
|
|
| > 3
|
|
| > 4
|
|
>>> exp = "12*3+4"
|
|
>>> t = str2tree(exp)
|
|
>>> print(t)
|
|
+
|
|
> *
|
|
| > 12
|
|
| > 3
|
|
> 4
|
|
>>> exp = "12*(3+4)"
|
|
>>> t = str2tree(exp)
|
|
>>> print(t)
|
|
*
|
|
> 12
|
|
> +
|
|
| > 3
|
|
| > 4
|
|
>>> exp = "12(3+4)"
|
|
>>> t = str2tree(exp)
|
|
>>> print(t)
|
|
*
|
|
> 12
|
|
> +
|
|
| > 3
|
|
| > 4
|
|
>>> exp = "-12(3-4)"
|
|
>>> t = str2tree(exp)
|
|
>>> print(t)
|
|
*
|
|
> -12
|
|
> +
|
|
| > 3
|
|
| > -4
|
|
>>> exp = "-a(3-4)"
|
|
>>> t = str2tree(exp)
|
|
>>> print(t)
|
|
-
|
|
> None
|
|
> *
|
|
| > a
|
|
| > +
|
|
| | > 3
|
|
| | > -4
|
|
>>> exp = "3-a"
|
|
>>> t = str2tree(exp)
|
|
>>> print(t)
|
|
+
|
|
> 3
|
|
> -
|
|
| > None
|
|
| > a
|
|
>>> exp = "1-(3+4)"
|
|
>>> t = str2tree(exp)
|
|
>>> print(t)
|
|
+
|
|
> 1
|
|
> -
|
|
| > None
|
|
| > +
|
|
| | > 3
|
|
| | > 4
|
|
|
|
"""
|
|
lfop = lookfor(is_operator)
|
|
operator_corout = partial(concurent_broadcast, lookfors=[lfop])
|
|
def pipeline(expression):
|
|
if convert_to_mo:
|
|
str2_corout = lookforNumbers(operator_corout(missing_times(moify(pparser(sink)))))
|
|
else:
|
|
str2_corout = lookforNumbers(operator_corout(missing_times(pparser(sink))))
|
|
|
|
for i in expression:
|
|
str2_corout.send(i)
|
|
a = str2_corout.throw(STOOOP)
|
|
|
|
return a
|
|
|
|
return pipeline
|
|
|
|
|
|
str2nestedlist = str2(list_sink)
|
|
|
|
|
|
# -----------------------------
|
|
# Reglages pour 'vim'
|
|
# vim:set autoindent expandtab tabstop=4 shiftwidth=4:
|
|
# cursor: 16 del
|