1
0
mirror of https://github.com/fraoustin/piwigotools.git synced 2025-06-07 07:56:31 +00:00

Merge branch 'feature-downandup' into develop

This commit is contained in:
fraoustin 2016-01-28 11:35:23 +01:00
commit 6e88197efd
14 changed files with 1421 additions and 6 deletions

View File

@ -1 +1 @@
Name::name@mail.com
Frédéric Aoustin::fraoustin@gmail.com

View File

@ -0,0 +1,11 @@
0.0.2
=====
- add verb upload, download and ws
- integrate progressbar
- manage thread
0.0.1
=====
init

View File

@ -1 +1,58 @@
piwigotools
piwigotools
===========
Piwigo is a famous open-source online photo gallery.
Piwigotools is a module python for manage your piwigo gallery.
The module add command "piwigo"
Installation
------------
Warning
~~~~~~~
Piwigotools needs the progressbar module.
But progressbar moduel is not compatible with python3
install for python2.7
::
pip install progressbar
install for python 3
::
git clone https://github.com/coagulant/progressbar-python3.git
cd progressbar-python3
python setup.py install
Install
~~~~~~~
::
pip install piwigotools
Or
::
git clone https://github.com/fraoustin/piwigotools.git
cd piwigotools
python setup.py install
Usage
-----
::
piwigo verb --param1=value1 --param2=value2 ...

View File

@ -0,0 +1 @@
piwigo

View File

@ -5,5 +5,226 @@
Module piwigotools
"""
__version_info__ = (0, 0, 1)
__version_info__ = (0, 0, 2)
__version__ = '.'.join([str(val) for val in __version_info__])
import inspect
import requests
import piwigo
class LoginException(Exception):
def __str__(self):
return "You are not logged"
class PiwigoException(Exception):
def __init__(self, strerr):
self._strerr = strerr
def __str__(self):
return self._strerr
class PiwigoExistException(PiwigoException):
def __init__(self, strerr):
PiwigoException.__init__(self, strerr)
class Piwigo(piwigo.Piwigo):
"""
describe piwigo gallery
"""
def __init__(self, url):
piwigo.Piwigo.__init__(self, url)
self._login = False
def login(self, username, password):
"""
login on piwigo gallery
"""
self.pwg.session.login(username=username, password=password)
self._login = True
return True
def logout(self):
"""
logout on piwigo gallery
"""
self.pwg.session.logout()
self._login = False
return True
@property
def plan(self):
return { i["name"] : i["id"] for i in self.pwg.categories.getList(recursive=True, fullname=True)['categories'] }
def _checkarg(fn):
def checking(self, *args, **kw):
args = list(args)
# manage path
if inspect.getargspec(fn).args.count('path'):
pos = inspect.getargspec(fn).args.index('path') -1
if args[pos][-2:] == ' /' : args[pos] = args[pos][:-2]
args = tuple(args)
return fn(self, *args, **kw)
return checking
def _checklogin(fn):
def checking(self, *args, **kw):
if self._login:
return fn(self, *args, **kw)
raise LoginException()
return checking
@property
@_checklogin
def token(self):
"""
return pwg_token
"""
return self.pwg.session.getStatus()["pwg_token"]
def islogged(self):
try:
self.token
except LoginException:
return False
return True
@_checkarg
def iscategory(self, path):
if path in self.plan:
return True
return False
@_checkarg
def idcategory(self, path):
if not self.iscategory(path):
raise PiwigoExistException("category %s not exist" % path)
return self.plan[path]
@_checkarg
def images(self, path, **kw):
"""
return list of file name image for path
"""
kw["cat_id"]= self.idcategory(path)
kw["per_page"] = 200
kw["page"] = 0
imgs = {}
loop = True
while loop:
req = self.pwg.categories.getImages(**kw)
for img in req["images"]:
imgs[img["file"]] = img
if req["paging"]["count"] < req["paging"]["per_page"]:
loop = False
return imgs
@_checkarg
def sublevels(self, path, **kw):
"""
return list of category in for path
"""
kw["cat_id"]= self.idcategory(path)
return { i["name"] : i for i in self.pwg.categories.getList(**kw)['categories'] if i["id"] != kw["cat_id"] }
@_checkarg
def isimage(self, path):
img = path.split(' / ')[-1]
path = ' / '.join(path.split(' / ')[:-1])
if img in self.images(path):
return True
return False
@_checkarg
def idimage(self, path):
if not self.isimage(path):
raise PiwigoExistException("image %s not exist" % path)
img = path.split(' / ')[-1]
path = ' / '.join(path.split(' / ')[:-1])
return self.images(path)[img]["id"]
@_checkarg
@_checklogin
def mkdir(self, path, **kw):
"""
create a category named path
"""
kw['name'] = path.split(' / ')[-1]
parent = ' / '.join(path.split(' / ')[:-1])
if parent and not self.iscategory(parent):
raise PiwigoExistException("category %s not exist" % parent)
if parent : kw['parent'] = self.plan[parent]
self.pwg.categories.add(**kw)
return self.idcategory(path)
@_checkarg
@_checklogin
def makedirs(self, path, **kw):
"""
recursive category create function
"""
pp = ''
for p in path.split(' / '):
pp = '%s%s' % (pp, p)
if not self.iscategory(pp):
self.mkdir(pp, **kw)
pp = '%s / ' % pp
return self.idcategory(path)
@_checkarg
@_checklogin
def removedirs(self, path, **kw):
"""
remove (delete) category
"""
self.pwg.categories.delete(category_id=self.idcategory(path), pwg_token=self.token, **kw)
return True
@_checkarg
@_checklogin
def upload(self, image, path="", **kw):
"""
upload image in path
"""
kw["image"] = image
if len(path):
if not self.iscategory(path):
raise PiwigoExistException("category %s not exist" % parent)
kw['category'] = self.idcategory(path)
return self.pwg.images.addSimple(**kw)['image_id']
@_checkarg
@_checklogin
def download(self, path, dst, **kw):
"""
download image dst
"""
if not self.isimage(path):
raise PiwigoException("image %s not exist" % path)
img = path.split(' / ')[-1]
path = ' / '.join(path.split(' / ')[:-1])
url = self.images(path)[img]['element_url']
with open(dst, 'wb') as img:
r = requests.get(url)
img.write(r.content)
r.connection.close()
return True
@_checklogin
def remove(self, path, **kw):
"""
remove (delete) image
"""
if not self.isimage(path):
raise PiwigoException("image %s not exist" % path)
self.pwg.images.delete(image_id= self.idimage(path), pwg_token=self.token)
return True

124
piwigotools/interface.py Normal file
View File

@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
import sys
import os
import threading
try:
import Queue as queue
except:
import queue
import time
import piwigotools.progressbar as progressbar
class Step(threading.Thread):
def __init__(self, qin, qout, qerr):
threading.Thread.__init__(self)
self.qin = qin
self.qout = qout
self.qerr = qerr
def run(self):
while not self.qin.empty():
try:
call, arg, kw = self.qin.get_nowait()
try:
call(*arg, **kw)
except Exception as e:
self.qerr.put([call, arg, kw, e])
self.qout.put([call, arg, kw])
except queue.Empty:
pass
class Run:
def __init__(self, name, cnt=1):
self._name = name
self._qin = queue.Queue()
self._qout = queue.Queue()
self._qerr = queue.Queue()
self._threads = [ Step(self._qin, self._qout, self._qerr) for i in range(cnt)]
@property
def error(self):
"""
return true if _qerr.qsize() > 0
"""
if self._qerr.qsize() > 0:
return True
return False
@property
def strerror(self):
ret = ""
while not self._qerr.empty():
call, arg, kw, e = self._qerr.get_nowait()
ret = "%s%s\n" % (ret, e)
return ret
def add(self, call, arg, kw):
self._qin.put([call, arg, kw])
def start(self):
self._qout.maxsize = self._qin.qsize()
pbar = progressbar.ProgressBar(widgets=['%s ' % self._name,
progressbar.Counter() ,
' on %s ' % self._qin.qsize(),
progressbar.Bar(),
' ',
progressbar.Timer()],
maxval=self._qin.qsize()).start()
for thread in self._threads:
thread.start()
while not self._qout.full():
time.sleep(0.1) # sleep 0.1s
pbar.update(self._qout.qsize())
pbar.finish()
return self._qerr
class StepAnalyse(threading.Thread):
def __init__(self, pbar):
threading.Thread.__init__(self)
self._pbar = pbar
self._stopevent = threading.Event()
def run(self):
self._pbar.start()
i = 0
while not self._stopevent.isSet():
try:
self._pbar.update(i)
i = i + 1
except:
pass
time.sleep(0.1)
def stop(self):
self._stopevent.set()
self._pbar.finish()
class Analyse:
def __init__(self, name):
self._name = name
pbar = progressbar.ProgressBar(widgets=['%s: ' % name,
progressbar.AnimatedMarker(),
' | ',
progressbar.Timer()]
)
self._thread = StepAnalyse(pbar)
def start(self):
self._thread.start()
def stop(self):
self._thread.stop()
def purge_kw(kw, notkw):
return {k : kw[k] for k in kw if k not in notkw}

208
piwigotools/main.py Normal file
View File

@ -0,0 +1,208 @@
# -*- coding: utf-8 -*-
import sys
import os, os.path
import glob
import pprint
try:
from myterm.parser import OptionParser
except:
from optparse import OptionParser
from piwigotools import Piwigo, __version__
from piwigo.ws import Ws
from piwigotools.interface import *
DESCRIPTION = "tools for piwigo gallery"
USAGE = """piwigo verb --param1=value1 --param2=value2
verb list
- upload
- download
- sync
- ws
to get help: piwigo verb --help
"""
AUTHOR = "Frederic Aoustin"
PROG = "piwigo"
VERSION = __version__
VERBS = {
"upload":
{
"usage" : "usage for verb upload",
"description" : "upload file in piwigo gallery",
"arg" :
{
"category" : {"type":"string", "default":"/", "help":"destination category of piwigo gallery"},
"source" : {"type":"string", "default":"*.jpg", "help":"path of upload picture"},
"url" : {"type":"string", "default":"", "help":"url of piwigo gallery"},
"user" : {"type":"string", "default":"", "help":"user of piwigo gallery"},
"password" : {"type":"string", "default":"", "help":"password of piwigo gallery"},
"thread" : {"type":"int", "default":"1", "help":"number of thread"},
},
},
"download":
{
"usage" : "usage for verb download",
"description" : "download image from piwigo gallery",
"arg" :
{
"category" : {"type":"string", "default":"/", "help":"source category of piwigo gallery"},
"dest" : {"type":"string", "default":".", "help":"path of destination"},
"url" : {"type":"string", "default":"", "help":"url of piwigo gallery"},
"user" : {"type":"string", "default":"", "help":"user of piwigo gallery"},
"password" : {"type":"string", "default":"", "help":"password of piwigo gallery"},
"thread" : {"type":"int", "default":"1", "help":"number of thread"},
},
},
"sync":
{
"usage" : "usage for verb sync",
"description" : "synchronization between path and piwigo gallery",
"arg" :
{
"category" : {"type":"string", "default":"/", "help":"category of piwigo gallery"},
"source" : {"type":"string", "default":".", "help":"path of picture"},
"url" : {"type":"string", "default":"", "help":"url of piwigo gallery"},
"user" : {"type":"string", "default":"", "help":"user of piwigo gallery"},
"password" : {"type":"string", "default":"", "help":"password of piwigo gallery"},
"thread" : {"type":"int", "default":"1", "help":"number of thread"},
},
},
"ws":
{
"usage" : "usage for verb ws",
"description" : "use web service of piwigo gallery",
"arg" :
{
"method" : {"type":"string", "default":".", "help":"name of web service"},
"url" : {"type":"string", "default":"", "help":"url of piwigo gallery"},
},
},
}
def add_dynamic_option(parser):
# add arg for verb
if not len(sys.argv) > 1:
parser.print_help()
sys.exit(1)
if sys.argv[1] in ("--help", "-h"):
parser.print_help()
parser.print_version()
sys.exit(0)
if sys.argv[1] in ("--version"):
parser.print_version()
sys.exit(0)
verb = sys.argv[1]
arg_know = ['--help']
for arg in VERBS.get(verb, {'arg':{}})['arg']:
kw = VERBS[sys.argv[1]]['arg'][arg]
kw['dest'] = arg
parser.add_option("--%s" % arg, **kw)
arg_know.append("--%s" % arg)
# add arg in argv
for arg in sys.argv[2:]:
if arg[:2] == '--' and arg.split('=')[0] not in arg_know:
arg = arg[2:].split('=')[0]
parser.add_option("--%s" % arg , dest=arg, type="string")
arg_know.append("--%s" % arg)
#check verb
if verb not in VERBS:
parser.print_help()
parser.exit(status=2, msg='verb "%s" unknow\n' % verb)
sys.exit(0)
parser.set_usage(VERBS[verb]["usage"])
parser.description = VERBS[verb]["description"]
if '--help' in sys.argv[1:]:
parser.print_help()
sys.exit(0)
def main():
usage = USAGE
parser = OptionParser(version="%s %s" % (PROG,VERSION), usage=usage)
parser.description= DESCRIPTION
parser.epilog = AUTHOR
try:
add_dynamic_option(parser)
(options, args) = parser.parse_args()
verb = args[0]
if verb == 'ws':
piwigo = Piwigo(url=options.url)
if 'user' and 'password' in options.__dict__:
piwigo.login(options.user, options.password)
kw = purge_kw(options.__dict__,('user','password','url'))
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(Ws(piwigo, options.method)(**kw))
if piwigo.islogged:
piwigo.logout()
if verb == "download":
ana = Analyse('Analyze')
ana.start()
try:
piwigo = Piwigo(url=options.url)
piwigo.login(options.user, options.password)
# check
if not os.path.isdir(options.dest):
os.makedirs(options.dest)
options.dest = os.path.abspath(options.dest)
piwigo.iscategory(options.category)
if options.category[-2:] == ' /' : options.category = options.category[:-2]
# treatment
run = Run(verb, options.thread)
kw = purge_kw(options.__dict__,('user','password','url','dest','category','thread'))
for img in piwigo.images(options.category, **kw):
run.add(piwigo.download,
["%s / %s" % (options.category, str(img)), "%s/%s" % (options.dest, str(img))],
kw)
except Exception as e:
ana.stop()
raise e
ana.stop()
run.start()
piwigo.logout()
if run.error:
parser.error(run.strerror)
if verb == "upload":
ana = Analyse('Analyze')
ana.start()
try:
piwigo = Piwigo(url=options.url)
piwigo.login(options.user, options.password)
# check
piwigo.makedirs(options.category)
# treatment
run = Run(verb, options.thread)
kw = purge_kw(options.__dict__,('user','password','url','source','category','thread'))
for img in glob.glob(options.source):
run.add(piwigo.upload,
[os.path.abspath(img), options.category],
kw)
ana.stop()
except Exception as e:
ana.stop()
raise e
run.start()
piwigo.logout()
if run.error:
parser.error(run.strerror)
except Exception as e:
parser.error(e)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,54 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# progressbar - Text progress bar library for Python.
# Copyright (c) 2005 Nilton Volpato
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
# IMPORTANT
# copy of https://github.com/coagulant/progressbar-python3
"""Text progress bar library for Python.
A text progress bar is typically used to display the progress of a long
running operation, providing a visual cue that processing is underway.
The ProgressBar class manages the current progress, and the format of the line
is given by a number of widgets. A widget is an object that may display
differently depending on the state of the progress bar. There are three types
of widgets:
- a string, which always shows itself
- a ProgressBarWidget, which may return a different value every time its
update method is called
- a ProgressBarWidgetHFill, which is like ProgressBarWidget, except it
expands to fill the remaining width of the line.
The progressbar module is very easy to use, yet very powerful. It will also
automatically enable features like auto-resizing when the system supports it.
"""
__author__ = 'Nilton Volpato'
__author_email__ = 'first-name dot last-name @ gmail.com'
__date__ = '2011-05-14'
__version__ = '2.3dev'
from .compat import *
from .widgets import *
from .progressbar import *

View File

@ -0,0 +1,45 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# progressbar - Text progress bar library for Python.
# Copyright (c) 2005 Nilton Volpato
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Compatibility methods and classes for the progressbar module."""
# Python 3.x (and backports) use a modified iterator syntax
# This will allow 2.x to behave with 3.x iterators
try:
next
except NameError:
def next(iter):
try:
# Try new style iterators
return iter.__next__()
except AttributeError:
# Fallback in case of a "native" iterator
return iter.next()
# Python < 2.5 does not have "any"
try:
any
except NameError:
def any(iterator):
for item in iterator:
if item: return True
return False

View File

@ -0,0 +1,296 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# progressbar - Text progress bar library for Python.
# Copyright (c) 2005 Nilton Volpato
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Main ProgressBar class."""
from __future__ import division
import math
import os
import signal
import sys
import time
try:
from fcntl import ioctl
from array import array
import termios
except ImportError:
pass
from . import widgets
class UnknownLength: pass
class ProgressBar(object):
"""The ProgressBar class which updates and prints the bar.
A common way of using it is like:
>>> pbar = ProgressBar().start()
>>> for i in range(100):
... # do something
... pbar.update(i+1)
...
>>> pbar.finish()
You can also use a ProgressBar as an iterator:
>>> progress = ProgressBar()
>>> for i in progress(some_iterable):
... # do something
...
Since the progress bar is incredibly customizable you can specify
different widgets of any type in any order. You can even write your own
widgets! However, since there are already a good number of widgets you
should probably play around with them before moving on to create your own
widgets.
The term_width parameter represents the current terminal width. If the
parameter is set to an integer then the progress bar will use that,
otherwise it will attempt to determine the terminal width falling back to
80 columns if the width cannot be determined.
When implementing a widget's update method you are passed a reference to
the current progress bar. As a result, you have access to the
ProgressBar's methods and attributes. Although there is nothing preventing
you from changing the ProgressBar you should treat it as read only.
Useful methods and attributes include (Public API):
- currval: current progress (0 <= currval <= maxval)
- maxval: maximum (and final) value
- finished: True if the bar has finished (reached 100%)
- start_time: the time when start() method of ProgressBar was called
- seconds_elapsed: seconds elapsed since start_time and last call to
update
- percentage(): progress in percent [0..100]
"""
__slots__ = ('currval', 'fd', 'finished', 'last_update_time',
'left_justify', 'maxval', 'next_update', 'num_intervals',
'poll', 'seconds_elapsed', 'signal_set', 'start_time',
'term_width', 'update_interval', 'widgets', '_time_sensitive',
'__iterable')
_DEFAULT_MAXVAL = 100
_DEFAULT_TERMSIZE = 80
_DEFAULT_WIDGETS = [widgets.Percentage(), ' ', widgets.Bar()]
def __init__(self, maxval=None, widgets=None, term_width=None, poll=1,
left_justify=True, fd=sys.stderr):
"""Initializes a progress bar with sane defaults."""
# Don't share a reference with any other progress bars
if widgets is None:
widgets = list(self._DEFAULT_WIDGETS)
self.maxval = maxval
self.widgets = widgets
self.fd = fd
self.left_justify = left_justify
self.signal_set = False
if term_width is not None:
self.term_width = term_width
else:
try:
self._handle_resize()
signal.signal(signal.SIGWINCH, self._handle_resize)
self.signal_set = True
except (SystemExit, KeyboardInterrupt): raise
except:
self.term_width = self._env_size()
self.__iterable = None
self._update_widgets()
self.currval = 0
self.finished = False
self.last_update_time = None
self.poll = poll
self.seconds_elapsed = 0
self.start_time = None
self.update_interval = 1
def __call__(self, iterable):
"""Use a ProgressBar to iterate through an iterable."""
try:
self.maxval = len(iterable)
except:
if self.maxval is None:
self.maxval = UnknownLength
self.__iterable = iter(iterable)
return self
def __iter__(self):
return self
def __next__(self):
try:
value = next(self.__iterable)
if self.start_time is None: self.start()
else: self.update(self.currval + 1)
return value
except StopIteration:
self.finish()
raise
# Create an alias so that Python 2.x won't complain about not being
# an iterator.
next = __next__
def _env_size(self):
"""Tries to find the term_width from the environment."""
return int(os.environ.get('COLUMNS', self._DEFAULT_TERMSIZE)) - 1
def _handle_resize(self, signum=None, frame=None):
"""Tries to catch resize signals sent from the terminal."""
h, w = array('h', ioctl(self.fd, termios.TIOCGWINSZ, '\0' * 8))[:2]
self.term_width = w
def percentage(self):
"""Returns the progress as a percentage."""
return self.currval * 100.0 / self.maxval
percent = property(percentage)
def _format_widgets(self):
result = []
expanding = []
width = self.term_width
for index, widget in enumerate(self.widgets):
if isinstance(widget, widgets.WidgetHFill):
result.append(widget)
expanding.insert(0, index)
else:
widget = widgets.format_updatable(widget, self)
result.append(widget)
width -= len(widget)
count = len(expanding)
while count:
portion = max(int(math.ceil(width * 1. / count)), 0)
index = expanding.pop()
count -= 1
widget = result[index].update(self, portion)
width -= len(widget)
result[index] = widget
return result
def _format_line(self):
"""Joins the widgets and justifies the line."""
widgets = ''.join(self._format_widgets())
if self.left_justify: return widgets.ljust(self.term_width)
else: return widgets.rjust(self.term_width)
def _need_update(self):
"""Returns whether the ProgressBar should redraw the line."""
if self.currval >= self.next_update or self.finished: return True
delta = time.time() - self.last_update_time
return self._time_sensitive and delta > self.poll
def _update_widgets(self):
"""Checks all widgets for the time sensitive bit."""
self._time_sensitive = any(getattr(w, 'TIME_SENSITIVE', False)
for w in self.widgets)
def update(self, value=None):
"""Updates the ProgressBar to a new value."""
if value is not None and value is not UnknownLength:
if (self.maxval is not UnknownLength
and not 0 <= value <= self.maxval):
raise ValueError('Value out of range')
self.currval = value
if not self._need_update(): return
if self.start_time is None:
raise RuntimeError('You must call "start" before calling "update"')
now = time.time()
self.seconds_elapsed = now - self.start_time
self.next_update = self.currval + self.update_interval
self.fd.write(self._format_line() + '\r')
self.last_update_time = now
def start(self):
"""Starts measuring time, and prints the bar at 0%.
It returns self so you can use it like this:
>>> pbar = ProgressBar().start()
>>> for i in range(100):
... # do something
... pbar.update(i+1)
...
>>> pbar.finish()
"""
if self.maxval is None:
self.maxval = self._DEFAULT_MAXVAL
self.num_intervals = max(100, self.term_width)
self.next_update = 0
if self.maxval is not UnknownLength:
if self.maxval < 0: raise ValueError('Value out of range')
self.update_interval = self.maxval / self.num_intervals
self.start_time = self.last_update_time = time.time()
self.update(0)
return self
def finish(self):
"""Puts the ProgressBar bar in the finished state."""
self.finished = True
self.update(self.maxval)
self.fd.write('\n')
if self.signal_set:
signal.signal(signal.SIGWINCH, signal.SIG_DFL)

View File

@ -0,0 +1,311 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# progressbar - Text progress bar library for Python.
# Copyright (c) 2005 Nilton Volpato
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Default ProgressBar widgets."""
from __future__ import division
import datetime
import math
try:
from abc import ABCMeta, abstractmethod
except ImportError:
AbstractWidget = object
abstractmethod = lambda fn: fn
else:
AbstractWidget = ABCMeta('AbstractWidget', (object,), {})
def format_updatable(updatable, pbar):
if hasattr(updatable, 'update'): return updatable.update(pbar)
else: return updatable
class Widget(AbstractWidget):
"""The base class for all widgets.
The ProgressBar will call the widget's update value when the widget should
be updated. The widget's size may change between calls, but the widget may
display incorrectly if the size changes drastically and repeatedly.
The boolean TIME_SENSITIVE informs the ProgressBar that it should be
updated more often because it is time sensitive.
"""
TIME_SENSITIVE = False
__slots__ = ()
@abstractmethod
def update(self, pbar):
"""Updates the widget.
pbar - a reference to the calling ProgressBar
"""
class WidgetHFill(Widget):
"""The base class for all variable width widgets.
This widget is much like the \\hfill command in TeX, it will expand to
fill the line. You can use more than one in the same line, and they will
all have the same width, and together will fill the line.
"""
@abstractmethod
def update(self, pbar, width):
"""Updates the widget providing the total width the widget must fill.
pbar - a reference to the calling ProgressBar
width - The total width the widget must fill
"""
class Timer(Widget):
"""Widget which displays the elapsed seconds."""
__slots__ = ('format_string',)
TIME_SENSITIVE = True
def __init__(self, format='Elapsed Time: %s'):
self.format_string = format
@staticmethod
def format_time(seconds):
"""Formats time as the string "HH:MM:SS"."""
return str(datetime.timedelta(seconds=int(seconds)))
def update(self, pbar):
"""Updates the widget to show the elapsed time."""
return self.format_string % self.format_time(pbar.seconds_elapsed)
class ETA(Timer):
"""Widget which attempts to estimate the time of arrival."""
TIME_SENSITIVE = True
def update(self, pbar):
"""Updates the widget to show the ETA or total time when finished."""
if pbar.currval == 0:
return 'ETA: --:--:--'
elif pbar.finished:
return 'Time: %s' % self.format_time(pbar.seconds_elapsed)
else:
elapsed = pbar.seconds_elapsed
eta = elapsed * pbar.maxval / pbar.currval - elapsed
return 'ETA: %s' % self.format_time(eta)
class FileTransferSpeed(Widget):
"""Widget for showing the transfer speed (useful for file transfers)."""
FORMAT = '%6.2f %s%s/s'
PREFIXES = ' kMGTPEZY'
__slots__ = ('unit',)
def __init__(self, unit='B'):
self.unit = unit
def update(self, pbar):
"""Updates the widget with the current SI prefixed speed."""
if pbar.seconds_elapsed < 2e-6 or pbar.currval < 2e-6: # =~ 0
scaled = power = 0
else:
speed = pbar.currval / pbar.seconds_elapsed
power = int(math.log(speed, 1000))
scaled = speed / 1000.**power
return self.FORMAT % (scaled, self.PREFIXES[power], self.unit)
class AnimatedMarker(Widget):
"""An animated marker for the progress bar which defaults to appear as if
it were rotating.
"""
__slots__ = ('markers', 'curmark')
def __init__(self, markers='|/-\\'):
self.markers = markers
self.curmark = -1
def update(self, pbar):
"""Updates the widget to show the next marker or the first marker when
finished"""
if pbar.finished: return self.markers[0]
self.curmark = (self.curmark + 1) % len(self.markers)
return self.markers[self.curmark]
# Alias for backwards compatibility
RotatingMarker = AnimatedMarker
class Counter(Widget):
"""Displays the current count."""
__slots__ = ('format_string',)
def __init__(self, format='%d'):
self.format_string = format
def update(self, pbar):
return self.format_string % pbar.currval
class Percentage(Widget):
"""Displays the current percentage as a number with a percent sign."""
def update(self, pbar):
return '%3d%%' % pbar.percentage()
class FormatLabel(Timer):
"""Displays a formatted label."""
mapping = {
'elapsed': ('seconds_elapsed', Timer.format_time),
'finished': ('finished', None),
'last_update': ('last_update_time', None),
'max': ('maxval', None),
'seconds': ('seconds_elapsed', None),
'start': ('start_time', None),
'value': ('currval', None)
}
__slots__ = ('format_string',)
def __init__(self, format):
self.format_string = format
def update(self, pbar):
context = {}
for name, (key, transform) in self.mapping.items():
try:
value = getattr(pbar, key)
if transform is None:
context[name] = value
else:
context[name] = transform(value)
except: pass
return self.format_string % context
class SimpleProgress(Widget):
"""Returns progress as a count of the total (e.g.: "5 of 47")."""
__slots__ = ('sep',)
def __init__(self, sep=' of '):
self.sep = sep
def update(self, pbar):
return '%d%s%d' % (pbar.currval, self.sep, pbar.maxval)
class Bar(WidgetHFill):
"""A progress bar which stretches to fill the line."""
__slots__ = ('marker', 'left', 'right', 'fill', 'fill_left')
def __init__(self, marker='#', left='|', right='|', fill=' ',
fill_left=True):
"""Creates a customizable progress bar.
marker - string or updatable object to use as a marker
left - string or updatable object to use as a left border
right - string or updatable object to use as a right border
fill - character to use for the empty part of the progress bar
fill_left - whether to fill from the left or the right
"""
self.marker = marker
self.left = left
self.right = right
self.fill = fill
self.fill_left = fill_left
def update(self, pbar, width):
"""Updates the progress bar and its subcomponents."""
left, marked, right = (format_updatable(i, pbar) for i in
(self.left, self.marker, self.right))
width -= len(left) + len(right)
# Marked must *always* have length of 1
if pbar.maxval:
marked *= int(pbar.currval / pbar.maxval * width)
else:
marked = ''
if self.fill_left:
return '%s%s%s' % (left, marked.ljust(width, self.fill), right)
else:
return '%s%s%s' % (left, marked.rjust(width, self.fill), right)
class ReverseBar(Bar):
"""A bar which has a marker which bounces from side to side."""
def __init__(self, marker='#', left='|', right='|', fill=' ',
fill_left=False):
"""Creates a customizable progress bar.
marker - string or updatable object to use as a marker
left - string or updatable object to use as a left border
right - string or updatable object to use as a right border
fill - character to use for the empty part of the progress bar
fill_left - whether to fill from the left or the right
"""
self.marker = marker
self.left = left
self.right = right
self.fill = fill
self.fill_left = fill_left
class BouncingBar(Bar):
def update(self, pbar, width):
"""Updates the progress bar and its subcomponents."""
left, marker, right = (format_updatable(i, pbar) for i in
(self.left, self.marker, self.right))
width -= len(left) + len(right)
if pbar.finished: return '%s%s%s' % (left, width * marker, right)
position = int(pbar.currval % (width * 2 - 1))
if position > width: position = width * 2 - position
lpad = self.fill * (position - 1)
rpad = self.fill * (width - len(marker) - len(lpad))
# Swap if we want to bounce the other way
if not self.fill_left: rpad, lpad = lpad, rpad
return '%s%s%s%s%s' % (left, lpad, marker, rpad, right)

View File

@ -11,8 +11,9 @@ import piwigotools
NAME = "piwigotools"
VERSION = piwigotools.__version__
DESC = "piwigotools description"
URLPKG = "https://url/of/piwigotools/website"
DESC = "mange your piwigo gallery by command piwigo"
URLPKG = "https://github.com/fraoustin/piwigotools"
HERE = os.path.abspath(os.path.dirname(__file__))
@ -44,4 +45,9 @@ setup(
install_requires=REQUIRED,
url=URLPKG,
classifiers=CLASSIFIED,
entry_points = {
'console_scripts': [
'piwigo = piwigotools.main:main',
],
},
)

BIN
tests/samplepiwigotools.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

View File

@ -6,13 +6,94 @@
"""
import unittest
import os
import os.path
from piwigotools import Piwigo, LoginException, PiwigoExistException
class BasicTestCase(unittest.TestCase):
"""
Class for Basic Test for piwigotools
"""
def setUp(self):
pass
self.url = "http://mygallery.piwigo.com/"
self.usertest = 'USERTEST'
self.passwordtest = 'xxxxxx'
self.piwigo = Piwigo(self.url)
def test_basic(self):
self.assertTrue(self.piwigo.pwg.getVersion())
def test_checkLogin(self):
self.assertTrue(self.piwigo.login(self.usertest, self.passwordtest))
self.assertTrue(self.piwigo.logout())
self.assertRaises(LoginException, self.piwigo.mkdir)
self.assertRaises(LoginException, self.piwigo.makedirs)
self.assertRaises(LoginException, self.piwigo.upload)
def test_createCategory(self):
self.piwigo.login(self.usertest, self.passwordtest)
self.assertTrue(self.piwigo.mkdir('level'))
self.assertTrue(self.piwigo.mkdir('level / sublevel'))
self.assertTrue(self.piwigo.makedirs('level2 / sublevel2'))
self.piwigo.removedirs('level2')
self.piwigo.removedirs('level')
self.piwigo.logout()
def test_checkpath(self):
self.piwigo.login(self.usertest, self.passwordtest)
self.piwigo.mkdir('level')
self.assertTrue(self.piwigo.iscategory('level'))
self.assertTrue(self.piwigo.iscategory('level /'))
self.piwigo.removedirs('level')
self.piwigo.logout()
def test_removeCategory(self):
self.piwigo.login(self.usertest, self.passwordtest)
self.piwigo.makedirs('level2 / sublevel2')
self.assertTrue(self.piwigo.removedirs('level2'))
self.assertFalse(self.piwigo.iscategory('level2'))
self.piwigo.logout()
def test_uploadImage(self):
self.piwigo.login(self.usertest, self.passwordtest)
self.piwigo.mkdir('level')
img = os.path.join(os.path.dirname(os.path.abspath(__file__)),'samplepiwigotools.jpg')
id = self.piwigo.upload(image=img, path="level")
self.assertTrue(id)
self.assertTrue(self.piwigo.isimage('level / samplepiwigotools.jpg'))
self.piwigo.pwg.images.delete(image_id=id, pwg_token=self.piwigo.token)
self.piwigo.removedirs('level')
self.piwigo.logout()
def test_removeImage(self):
self.piwigo.login(self.usertest, self.passwordtest)
self.piwigo.mkdir('level')
img = os.path.join(os.path.dirname(os.path.abspath(__file__)),'samplepiwigotools.jpg')
id = self.piwigo.upload(image=img, path="level")
self.assertTrue(self.piwigo.remove('level / samplepiwigotools.jpg'))
self.assertFalse(self.piwigo.isimage('level / samplepiwigotools.jpg'))
self.piwigo.removedirs('level')
self.piwigo.logout()
def test_sublevel(self):
self.piwigo.login(self.usertest, self.passwordtest)
self.piwigo.makedirs('level2 / sublevel2')
self.assertTrue(len(self.piwigo.sublevels('level2')))
self.piwigo.removedirs('level2')
self.piwigo.logout()
def test_downloadImage(self):
self.piwigo.login(self.usertest, self.passwordtest)
self.piwigo.mkdir('level')
img = os.path.join(os.path.dirname(os.path.abspath(__file__)),'samplepiwigotools.jpg')
id = self.piwigo.upload(image=img, path="level")
imgdst = os.path.join(os.path.dirname(os.path.abspath(__file__)),'download.jpg')
self.assertTrue(self.piwigo.download("level / samplepiwigotools.jpg",imgdst))
os.remove(imgdst)
self.piwigo.remove('level / samplepiwigotools.jpg')
self.piwigo.removedirs('level')
self.piwigo.logout()
if __name__ == '__main__':
unittest.main()