Logo Search packages:      
Sourcecode: offlineimap version File versions  Download package

Curses.py

# Curses-based interfaces
# Copyright (C) 2003 John Goerzen
# <jgoerzen@complete.org>
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 2 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program; if not, write to the Free Software
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

from Blinkenlights import BlinkenBase
from UIBase import UIBase
from threading import *
import thread, time, sys, os, signal, time
from offlineimap import version, threadutil
from offlineimap.threadutil import MultiLock

import curses, curses.panel, curses.textpad, curses.wrapper

acctkeys = '1234567890abcdefghijklmnoprstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-=;/.,'

class CursesUtil:
    def __init__(self):
        self.pairlock = Lock()
        self.iolock = MultiLock()
        self.start()

    def initpairs(self):
        self.pairlock.acquire()
        try:
            self.pairs = {self._getpairindex(curses.COLOR_WHITE,
                                             curses.COLOR_BLACK): 0}
            self.nextpair = 1
        finally:
            self.pairlock.release()

    def lock(self):
        self.iolock.acquire()

    def unlock(self):
        self.iolock.release()
        
    def locked(self, target, *args, **kwargs):
        """Perform an operation with full locking."""
        self.lock()
        try:
            apply(target, args, kwargs)
        finally:
            self.unlock()

    def refresh(self):
        def lockedstuff():
            curses.panel.update_panels()
            curses.doupdate()
        self.locked(lockedstuff)

    def isactive(self):
        return hasattr(self, 'stdscr')

    def _getpairindex(self, fg, bg):
        return '%d/%d' % (fg,bg)

    def getpair(self, fg, bg):
        if not self.has_color:
            return 0
        pindex = self._getpairindex(fg, bg)
        self.pairlock.acquire()
        try:
            if self.pairs.has_key(pindex):
                return curses.color_pair(self.pairs[pindex])
            else:
                self.pairs[pindex] = self.nextpair
                curses.init_pair(self.nextpair, fg, bg)
                self.nextpair += 1
                return curses.color_pair(self.nextpair - 1)
        finally:
            self.pairlock.release()
    
    def start(self):
        self.stdscr = curses.initscr()
        curses.noecho()
        curses.cbreak()
        self.stdscr.keypad(1)
        try:
            curses.start_color()
            self.has_color = curses.has_colors()
        except:
            self.has_color = 0

        self.oldcursor = None
        try:
            self.oldcursor = curses.curs_set(0)
        except:
            pass
        
        self.stdscr.clear()
        self.stdscr.refresh()
        (self.height, self.width) = self.stdscr.getmaxyx()
        self.initpairs()

    def stop(self):
        if not hasattr(self, 'stdscr'):
            return
        #self.stdscr.addstr(self.height - 1, 0, "\n",
        #                   self.getpair(curses.COLOR_WHITE,
        #                                curses.COLOR_BLACK))
        if self.oldcursor != None:
            curses.curs_set(self.oldcursor)
        self.stdscr.refresh()
        self.stdscr.keypad(0)
        curses.nocbreak()
        curses.echo()
        curses.endwin()
        del self.stdscr

    def reset(self):
        self.stop()
        self.start()

class CursesAccountFrame:
    def __init__(s, master, accountname):
        s.c = master
        s.children = []
        s.accountname = accountname

    def drawleadstr(s, secs = None):
        if secs == None:
            acctstr = '%s: [active] %13.13s: ' % (s.key, s.accountname)
        else:
            acctstr = '%s: [%3d:%02d] %13.13s: ' % (s.key,
                                                    secs / 60, secs % 60,
                                                    s.accountname)
        s.c.locked(s.window.addstr, 0, 0, acctstr)
        s.location = len(acctstr)

    def setwindow(s, window, key):
        s.window = window
        s.key = key
        s.drawleadstr()
        for child in s.children:
            child.update(window, 0, s.location)
            s.location += 1

    def getnewthreadframe(s):
        tf = CursesThreadFrame(s.c, s.window, 0, s.location)
        s.location += 1
        s.children.append(tf)
        return tf

    def startsleep(s, sleepsecs):
        s.sleeping_abort = 0

    def sleeping(s, sleepsecs, remainingsecs):
        if remainingsecs:
            s.c.lock()
            try:
                s.drawleadstr(remainingsecs)
                s.window.refresh()
            finally:
                s.c.unlock()
            time.sleep(sleepsecs)
        else:
            s.c.lock()
            try:
                s.drawleadstr()
                s.window.refresh()
            finally:
                s.c.unlock()
        return s.sleeping_abort

    def syncnow(s):
        s.sleeping_abort = 1

class CursesThreadFrame:
    def __init__(s, master, window, y, x):
        """master should be a CursesUtil object."""
        s.c = master
        s.window = window
        s.x = x
        s.y = y
        s.colors = []
        bg = curses.COLOR_BLACK
        s.colormap = {'black': s.c.getpair(curses.COLOR_BLACK, bg),
                         'gray': s.c.getpair(curses.COLOR_WHITE, bg),
                         'white': curses.A_BOLD | s.c.getpair(curses.COLOR_WHITE, bg),
                         'blue': s.c.getpair(curses.COLOR_BLUE, bg),
                         'red': s.c.getpair(curses.COLOR_RED, bg),
                         'purple': s.c.getpair(curses.COLOR_MAGENTA, bg),
                         'cyan': s.c.getpair(curses.COLOR_CYAN, bg),
                         'green': s.c.getpair(curses.COLOR_GREEN, bg),
                         'orange': s.c.getpair(curses.COLOR_YELLOW, bg),
                         'yellow': curses.A_BOLD | s.c.getpair(curses.COLOR_YELLOW, bg),
                         'pink': curses.A_BOLD | s.c.getpair(curses.COLOR_RED, bg)}
        #s.setcolor('gray')
        s.setcolor('black')

    def setcolor(self, color):
        self.color = self.colormap[color]
        self.colorname = color
        self.display()

    def display(self):
        def lockedstuff():
            if self.getcolor() == 'black':
                self.window.addstr(self.y, self.x, ' ', self.color)
            else:
                self.window.addstr(self.y, self.x, '.', self.color)
            self.c.stdscr.move(self.c.height - 1, self.c.width - 1)
            self.window.refresh()
        self.c.locked(lockedstuff)

    def getcolor(self):
        return self.colorname

    def getcolorpair(self):
        return self.color

    def update(self, window, y, x):
        self.window = window
        self.y = y
        self.x = x
        self.display()

    def setthread(self, newthread):
        self.setcolor('black')
        #if newthread:
        #    self.setcolor('gray')
        #else:
        #    self.setcolor('black')

class InputHandler:
    def __init__(s, util):
        s.c = util
        s.bgchar = None
        s.inputlock = Lock()
        s.lockheld = 0
        s.statuslock = Lock()
        s.startup = Event()
        s.startthread()

    def startthread(s):
        s.thread = threadutil.ExitNotifyThread(target = s.bgreaderloop,
                                               name = "InputHandler loop")
        s.thread.setDaemon(1)
        s.thread.start()

    def bgreaderloop(s):
        while 1:
            s.statuslock.acquire()
            if s.lockheld or s.bgchar == None:
                s.statuslock.release()
                s.startup.wait()
            else:
                s.statuslock.release()
                ch = s.c.stdscr.getch()
                s.statuslock.acquire()
                try:
                    if s.lockheld or s.bgchar == None:
                        curses.ungetch(ch)
                    else:
                        s.bgchar(ch)
                finally:
                    s.statuslock.release()

    def set_bgchar(s, callback):
        """Sets a "background" character handler.  If a key is pressed
        while not doing anything else, it will be passed to this handler.

        callback is a function taking a single arg -- the char pressed.

        If callback is None, clears the request."""
        s.statuslock.acquire()
        oldhandler = s.bgchar
        newhandler = callback
        s.bgchar = callback

        if oldhandler and not newhandler:
            pass
        if newhandler and not oldhandler:
            s.startup.set()
            
        s.statuslock.release()

    def input_acquire(s):
        """Call this method when you want exclusive input control.
        Make sure to call input_release afterwards!
        """

        s.inputlock.acquire()
        s.statuslock.acquire()
        s.lockheld = 1
        s.statuslock.release()

    def input_release(s):
        """Call this method when you are done getting input."""
        s.statuslock.acquire()
        s.lockheld = 0
        s.statuslock.release()
        s.inputlock.release()
        s.startup.set()
        
class Blinkenlights(BlinkenBase, UIBase):
    def init_banner(s):
        s.af = {}
        s.aflock = Lock()
        s.c = CursesUtil()
        s.text = []
        BlinkenBase.init_banner(s)
        s.setupwindows()
        s.inputhandler = InputHandler(s.c)
        s.gettf().setcolor('red')
        s._msg(version.banner)
        s.inputhandler.set_bgchar(s.keypress)
        signal.signal(signal.SIGWINCH, s.resizehandler)
        s.resizelock = Lock()
        s.resizecount = 0

    def resizehandler(s, signum, frame):
        s.resizeterm()

    def resizeterm(s, dosleep = 1):
        if not s.resizelock.acquire(0):
            s.resizecount += 1
            return
        signal.signal(signal.SIGWINCH, signal.SIG_IGN)
        s.aflock.acquire()
        s.c.lock()
        s.resizecount += 1
        while s.resizecount:
            s.c.reset()
            s.setupwindows()
            s.resizecount -= 1
        s.c.unlock()
        s.aflock.release()
        s.resizelock.release()
        signal.signal(signal.SIGWINCH, s.resizehandler)
        if dosleep:
            time.sleep(1)
            s.resizeterm(0)

    def isusable(s):
        # Not a terminal?  Can't use curses.
        if not sys.stdout.isatty() and sys.stdin.isatty():
            return 0

        # No TERM specified?  Can't use curses.
        try:
            if not len(os.environ['TERM']):
                return 0
        except: return 0

        # ncurses doesn't want to start?  Can't use curses.
        # This test is nasty because initscr() actually EXITS on error.
        # grr.

        pid = os.fork()
        if pid:
            # parent
            return not os.WEXITSTATUS(os.waitpid(pid, 0)[1])
        else:
            # child
            curses.initscr()
            curses.endwin()
            # If we didn't die by here, indicate success.
            sys.exit(0)

    def keypress(s, key):
        if key > 255:
            return
        
        if chr(key) == 'q':
            # Request to quit.
            s.terminate()
        
        try:
            index = acctkeys.index(chr(key))
        except ValueError:
            # Key not a valid one: exit.
            return

        if index >= len(s.hotkeys):
            # Not in our list of valid hotkeys.
            return

        # Trying to end sleep somewhere.

        s.getaccountframe(s.hotkeys[index]).syncnow()

    def getpass(s, accountname, config, errmsg = None):
        s.inputhandler.input_acquire()

        # See comment on _msg for info on why both locks are obtained.
        
        s.tflock.acquire()
        s.c.lock()
        try:
            s.gettf().setcolor('white')
            s._addline(" *** Input Required", s.gettf().getcolorpair())
            s._addline(" *** Please enter password for account %s: " % accountname,
                   s.gettf().getcolorpair())
            s.logwindow.refresh()
            password = s.logwindow.getstr()
        finally:
            s.tflock.release()
            s.c.unlock()
            s.inputhandler.input_release()
        return password

    def setupwindows(s):
        s.c.lock()
        try:
            s.bannerwindow = curses.newwin(1, s.c.width, 0, 0)
            s.setupwindow_drawbanner()
            s.logheight = s.c.height - 1 - len(s.af.keys())
            s.logwindow = curses.newwin(s.logheight, s.c.width, 1, 0)
            s.logwindow.idlok(1)
            s.logwindow.scrollok(1)
            s.logwindow.move(s.logheight - 1, 0)
            s.setupwindow_drawlog()
            accounts = s.af.keys()
            accounts.sort()
            accounts.reverse()

            pos = s.c.height - 1
            index = 0
            s.hotkeys = []
            for account in accounts:
                accountwindow = curses.newwin(1, s.c.width, pos, 0)
                s.af[account].setwindow(accountwindow, acctkeys[index])
                s.hotkeys.append(account)
                index += 1
                pos -= 1

            curses.doupdate()
        finally:
            s.c.unlock()

    def setupwindow_drawbanner(s):
        if s.c.has_color:
            color = s.c.getpair(curses.COLOR_WHITE, curses.COLOR_BLUE) | \
                    curses.A_BOLD
        else:
            color = curses.A_REVERSE
        s.bannerwindow.bkgd(' ', color) # Fill background with that color
        s.bannerwindow.addstr("%s %s" % (version.productname,
                                         version.versionstr))
        s.bannerwindow.addstr(0, s.bannerwindow.getmaxyx()[1] - len(version.copyright) - 1,
                              version.copyright)
        
        s.bannerwindow.noutrefresh()

    def setupwindow_drawlog(s):
        if s.c.has_color:
            color = s.c.getpair(curses.COLOR_WHITE, curses.COLOR_BLACK)
        else:
            color = curses.A_NORMAL
        s.logwindow.bkgd(' ', color)
        for line, color in s.text:
            s.logwindow.addstr("\n" + line, color)
        s.logwindow.noutrefresh()

    def getaccountframe(s, accountname = None):
        if accountname == None:
            accountname = s.getthreadaccount()
        s.aflock.acquire()
        try:
            if accountname in s.af:
                return s.af[accountname]

            # New one.
            s.af[accountname] = CursesAccountFrame(s.c, accountname)
            s.c.lock()
            try:
                s.c.reset()
                s.setupwindows()
            finally:
                s.c.unlock()
        finally:
            s.aflock.release()
        return s.af[accountname]


    def _display(s, msg, color = None):
        if "\n" in msg:
            for thisline in msg.split("\n"):
                s._msg(thisline)
            return

        # We must acquire both locks.  Otherwise, deadlock can result.
        # This can happen if one thread calls _msg (locking curses, then
        # tf) and another tries to set the color (locking tf, then curses)
        #
        # By locking both up-front here, in this order, we prevent deadlock.
        
        s.tflock.acquire()
        s.c.lock()
        try:
            if not s.c.isactive():
                # For dumping out exceptions and stuff.
                print msg
                return
            if color:
                s.gettf().setcolor(color)
            s._addline(msg, s.gettf().getcolorpair())
            s.logwindow.refresh()
        finally:
            s.c.unlock()
            s.tflock.release()

    def _addline(s, msg, color):
        s.c.lock()
        try:
            s.logwindow.addstr("\n" + msg, color)
            s.text.append((msg, color))
            while len(s.text) > s.logheight:
                s.text = s.text[1:]
        finally:
            s.c.unlock()

    def terminate(s, exitstatus = 0):
        s.c.stop()
        UIBase.terminate(s, exitstatus)

    def threadException(s, thread):
        s.c.stop()
        UIBase.threadException(s, thread)

    def mainException(s):
        s.c.stop()
        UIBase.mainException(s)

    def sleep(s, sleepsecs):
        s.gettf().setcolor('red')
        s._msg("Next sync in %d:%02d" % (sleepsecs / 60, sleepsecs % 60))
        BlinkenBase.sleep(s, sleepsecs)
            
if __name__ == '__main__':
    x = Blinkenlights(None)
    x.init_banner()
    import time
    time.sleep(5)
    x.c.stop()
    fgs = {'black': curses.COLOR_BLACK, 'red': curses.COLOR_RED,
           'green': curses.COLOR_GREEN, 'yellow': curses.COLOR_YELLOW,
           'blue': curses.COLOR_BLUE, 'magenta': curses.COLOR_MAGENTA,
           'cyan': curses.COLOR_CYAN, 'white': curses.COLOR_WHITE}
    
    x = CursesUtil()
    win1 = curses.newwin(x.height, x.width / 4 - 1, 0, 0)
    win1.addstr("Black/normal\n")
    for name, fg in fgs.items():
        win1.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLACK))
    win2 = curses.newwin(x.height, x.width / 4 - 1, 0, win1.getmaxyx()[1])
    win2.addstr("Blue/normal\n")
    for name, fg in fgs.items():
        win2.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLUE))
    win3 = curses.newwin(x.height, x.width / 4 - 1, 0, win1.getmaxyx()[1] +
                         win2.getmaxyx()[1])
    win3.addstr("Black/bright\n")
    for name, fg in fgs.items():
        win3.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLACK) | \
                    curses.A_BOLD)
    win4 = curses.newwin(x.height, x.width / 4 - 1, 0, win1.getmaxyx()[1] * 3)
    win4.addstr("Blue/bright\n")
    for name, fg in fgs.items():
        win4.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLUE) | \
                    curses.A_BOLD)
        
        
    win1.refresh()
    win2.refresh()
    win3.refresh()
    win4.refresh()
    x.stdscr.refresh()
    import time
    time.sleep(5)
    x.stop()
    print x.has_color
    print x.height
    print x.width


Generated by  Doxygen 1.6.0   Back to index