# LinkExchange - Universal link exchange service client
# Copyright (C) 2009 Konstantin Korikov
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# NOTE: In the context of the Python environment, I interpret "dynamic
# linking" as importing -- thus the LGPL applies to the contents of
# the modules, but make no requirements on code importing these
# modules.
"""
>>> import StringIO
>>> from linkexchange import config
>>> cfg_lines = [
... '[options]',
... 'host=example.com',
... '',
... '[client-1]',
... 'type=sape',
... 'user=user12345',
... 'db_driver.type=mem',
... 'server-1=http://server1.com',
... 'server-2=http://server2.com',
... '',
... '[client-2]',
... 'type=linkfeed',
... 'user=user12345',
... 'db_driver.type=shelve',
... 'db_driver.filename=linkfeed-XXX.db',
... '',
... '[client-3]',
... 'type=trustlink',
... 'user=user12345',
... 'db_driver.type=mem',
... 'link_template = "%%(anchor)s %%(text)s"',
... '',
... '[formatter-1]',
... 'type=inline',
... 'count=none',
... 'delimiter=u" | "',
... ]
>>> vars = {}
>>> cfg_file = StringIO.StringIO('\\n'.join(cfg_lines))
>>> result = config.file_config(vars, cfg_file)
>>> result == [cfg_file]
True
>>> vars['options']['host']
'example.com'
>>> vars['platform'].clients[0].user
'user12345'
>>> vars['platform'].clients[0].server_list[0]
'http://server1.com'
>>> vars['platform'].clients[0].server_list[1]
'http://server2.com'
>>> vars['platform'].clients[1].db_driver.filename
'linkfeed-XXX.db'
>>> vars['platform'].clients[2].link_template
'%(anchor)s %(text)s'
>>> vars['formatters'][0].count
>>> vars['formatters'][0].delimiter
u' | '
>>> import os, tempfile
>>> cfg_fd, cfg_file = tempfile.mkstemp()
>>> os.close(cfg_fd)
>>> open(cfg_file, 'w').write('\\n'.join(cfg_lines))
>>> result = config.file_config(vars, cfg_file)
>>> result == [cfg_file]
True
>>> os.unlink(cfg_file)
>>> result = config.file_config(vars, cfg_file)
>>> result == []
True
"""
import sys
import ConfigParser
import shlex
from linkexchange.utils import load_plugin
from linkexchange.platform import Platform
class ConfigError(Exception):
"Configuration error"
def file_config(vars, fname, defaults = None, prefix = '',
default_encoding = 'utf-8'):
if defaults is None:
defaults = {}
cp = ConfigParser.SafeConfigParser(defaults)
try:
if hasattr(cp, 'readfp') and hasattr(fname, 'readline'):
cp.readfp(fname)
result = [fname]
else:
result = cp.read(fname)
if sys.version_info < (2, 4):
if cp.sections():
if type(fname) == list:
result = fname
else:
result = [fname]
else:
result = []
except (ConfigParser.MissingSectionHeaderError,
ConfigParser.ParsingError), e:
raise ConfigError("Parsing error: %s" % str(e))
if not result:
return result
_clients = []
_formatters = []
_options = {}
try:
encoding = cp.get('options', 'config_encoding')
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
encoding = default_encoding
for sec in cp.sections():
opts = dict([(o, cp.get(sec, o))
for o in cp.options(sec) if o not in defaults])
cn = fn = None
if sec == 'client':
cn = 1
elif sec.startswith('client-'):
cn = int(sec.split('-', 1)[1])
elif sec == 'formatter':
fn = 1
elif sec.startswith('formatter-'):
fn = int(sec.split('-', 1)[1])
elif sec == 'options':
for k, v in opts.items():
_options[k] = _eval_value(v, encoding)
if cn:
_clients.extend([None] * max(0, cn - len(_clients)))
_clients[cn - 1] = _parse_plugin_spec(opts, encoding)
if fn:
_formatters.extend([None] * max(0, fn - len(_formatters)))
_formatters[fn - 1] = _parse_plugin_spec(opts, encoding)
def load_client(spec):
try:
return load_plugin('linkexchange.clients', spec)
except ImportError:
raise ConfigError("Client not found: %s" % spec[0])
def load_formatter(spec):
try:
return load_plugin('linkexchange.formatters', spec)
except ImportError:
raise ConfigError("Formatter not found: %s" % spec[0])
clients = [load_client(x) for x in _clients if x is not None]
vars[prefix + 'platform'] = Platform(clients)
vars[prefix + 'formatters'] = [load_formatter(x)
for x in _formatters if x is not None]
vars[prefix + 'options'] = _options
return result
def _eval_value(value, encoding):
value = value.strip()
try:
return int(value)
except ValueError:
pass
try:
return long(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
value_lower = value.lower()
if value_lower in ('true', 'on', 'enabled'):
return True
if value_lower in ('false', 'off', 'disabled'):
return False
if value_lower == 'none':
return None
conv_str = lambda x: x
unquote_str = lambda x: x
if value.endswith('"'):
if value.startswith('u"'):
conv_str = lambda x: unicode(x, encoding)
value = value[1:]
if value.startswith('"'):
unquote_str = lambda x: ''.join(shlex.split(x))
return conv_str(unquote_str(value))
def _parse_plugin_spec(dic, encoding, toplevel=''):
opts = {}
compound = {}
for k, v in dic.items():
if '.' in k:
k1, k2 = k.split('.', 1)
compound.setdefault(k1, {})
compound[k1][k2] = v
elif k.endswith('_list'):
opts[k] = [_eval_value(x, encoding) for x in v.split(',')]
elif '-' in k and k.split('-')[-1].isdigit():
a, n = k.split('-')
kk = a + '_list'
nn = int(n) - 1
opts.setdefault(kk, [])
opts[kk].extend([None] * max(0, nn - len(opts[kk]) + 1))
opts[kk][nn] = _eval_value(v, encoding)
else:
opts[k] = _eval_value(v, encoding)
for k, v in compound.items():
opts[k] = _parse_plugin_spec(v, encoding,
toplevel + k + '.')
try:
plugin_type = opts.pop('type')
except KeyError:
raise ConfigError(("The %s option(s) is set, "
"but required %s is not set" % (', '.join(
[toplevel + x for x in opts.keys()]),
toplevel + 'type')))
return (plugin_type, [], opts)
if __name__ == "__main__":
import doctest
doctest.testmod()