python: Support status callbacks.
authorJustus Winter <justus@gnupg.org>
Tue, 24 May 2016 13:14:53 +0000 (15:14 +0200)
committerJustus Winter <justus@gnupg.org>
Tue, 24 May 2016 16:00:16 +0000 (18:00 +0200)
* lang/python/helpers.c (pyStatusCb): New function.
(pygpgme_set_status_cb): Likewise.
* lang/python/helpers.h (pygpgme_set_status_cb): New prototype.
* lang/python/pyme/core.py (Context.__init__): Initialize
'last_statuscb'.
(Context._free_statuscb): New function.
(Context.set_status_cb): Likewise.
* lang/python/tests/t-callbacks.py: Test status callbacks.

Signed-off-by: Justus Winter <justus@gnupg.org>
lang/python/helpers.c
lang/python/helpers.h
lang/python/pyme/core.py
lang/python/tests/t-callbacks.py

index d0c1f3b..ec7264a 100644 (file)
@@ -272,7 +272,72 @@ void pygpgme_set_progress_cb(gpgme_ctx_t ctx, PyObject *cb, PyObject **freelater
   *freelater = cb;
   gpgme_set_progress_cb(ctx, (gpgme_progress_cb_t) pyProgressCb, (void *) cb);
 }
+\f
+/* Status callbacks.  */
+static gpgme_error_t pyStatusCb(void *hook, const char *keyword,
+                                const char *args) {
+  gpgme_error_t err = 0;
+  PyObject *pyhook = (PyObject *) hook;
+  PyObject *self = NULL;
+  PyObject *func = NULL;
+  PyObject *dataarg = NULL;
+  PyObject *pyargs = NULL;
+  PyObject *retval = NULL;
+
+  assert (PyTuple_Check(pyhook));
+  assert (PyTuple_Size(pyhook) == 2 || PyTuple_Size(pyhook) == 3);
+  self = PyTuple_GetItem(pyhook, 0);
+  func = PyTuple_GetItem(pyhook, 1);
+  if (PyTuple_Size(pyhook) == 3) {
+    dataarg = PyTuple_GetItem(pyhook, 2);
+    pyargs = PyTuple_New(3);
+  } else {
+    pyargs = PyTuple_New(2);
+  }
+
+  if (keyword)
+    PyTuple_SetItem(pyargs, 0, PyUnicode_DecodeUTF8(keyword, strlen (keyword),
+                                                    "strict"));
+  else
+    {
+      Py_INCREF(Py_None);
+      PyTuple_SetItem(pyargs, 0, Py_None);
+    }
+  PyTuple_SetItem(pyargs, 1, PyUnicode_DecodeUTF8(args, strlen (args),
+                                                "strict"));
+  if (PyErr_Occurred()) {
+    err = gpg_error(GPG_ERR_GENERAL);
+    Py_DECREF(pyargs);
+    goto leave;
+  }
+
+  if (dataarg) {
+    Py_INCREF(dataarg);
+    PyTuple_SetItem(pyargs, 2, dataarg);
+  }
+
+  retval = PyObject_CallObject(func, pyargs);
+  if (PyErr_Occurred())
+    err = pygpgme_exception2code();
+  Py_DECREF(pyargs);
+  Py_XDECREF(retval);
+
+ leave:
+  if (err)
+    pygpgme_stash_callback_exception(self);
+  return err;
+}
 
+void pygpgme_set_status_cb(gpgme_ctx_t ctx, PyObject *cb,
+                           PyObject **freelater) {
+  if (cb == Py_None) {
+    gpgme_set_status_cb(ctx, NULL, NULL);
+    return;
+  }
+  Py_INCREF(cb);
+  *freelater = cb;
+  gpgme_set_status_cb(ctx, (gpgme_status_cb_t) pyStatusCb, (void *) cb);
+}
 \f
 /* Edit callbacks.  */
 gpgme_error_t pyEditCb(void *opaque, gpgme_status_code_t status,
index 4bd8ef8..5dd88a0 100644 (file)
@@ -35,6 +35,8 @@ PyObject *pygpgme_raise_callback_exception(PyObject *self);
 void pygpgme_set_passphrase_cb(gpgme_ctx_t ctx, PyObject *cb,
                               PyObject **freelater);
 void pygpgme_set_progress_cb(gpgme_ctx_t ctx, PyObject *cb, PyObject **freelater);
+void pygpgme_set_status_cb(gpgme_ctx_t ctx, PyObject *cb,
+                           PyObject **freelater);
 
 gpgme_error_t pyEditCb(void *opaque, gpgme_status_code_t status,
                       const char *args, int fd);
index 43a2413..f15444b 100644 (file)
@@ -64,6 +64,7 @@ class Context(GpgmeWrapper):
         super().__init__(wrapped)
         self.last_passcb = None
         self.last_progresscb = None
+        self.last_statuscb = None
 
     def __del__(self):
         if not pygpgme:
@@ -91,6 +92,14 @@ class Context(GpgmeWrapper):
                 pygpgme.delete_PyObject_p_p(self.last_progresscb)
             self.last_progresscb = None
 
+    def _free_statuscb(self):
+        if self.last_statuscb != None:
+            if pygpgme.pygpgme_clear_generic_cb:
+                pygpgme.pygpgme_clear_generic_cb(self.last_statuscb)
+            if pygpgme.delete_PyObject_p_p:
+                pygpgme.delete_PyObject_p_p(self.last_statuscb)
+            self.last_statuscb = None
+
     def op_keylist_all(self, *args, **kwargs):
         self.op_keylist_start(*args, **kwargs)
         key = self.op_keylist_next()
@@ -195,6 +204,29 @@ class Context(GpgmeWrapper):
                 hookdata = (self, func, hook)
         pygpgme.pygpgme_set_progress_cb(self.wrapped, hookdata, self.last_progresscb)
 
+    def set_status_cb(self, func, hook=None):
+        """Sets the status callback to the function specified by FUNC.  If
+        FUNC is None, the callback will be cleared.
+
+        The function will be called with two arguments, keyword and
+        args.  If HOOK is not None, it will be supplied as third
+        argument.
+
+        Please see the GPGME manual for more information.
+
+        """
+        self._free_statuscb()
+        if func == None:
+            hookdata = None
+        else:
+            self.last_statuscb = pygpgme.new_PyObject_p_p()
+            if hook == None:
+                hookdata = (self, func)
+            else:
+                hookdata = (self, func, hook)
+        pygpgme.pygpgme_set_status_cb(self.wrapped, hookdata,
+                                      self.last_statuscb)
+
     def get_engine_info(self):
         """Returns this context specific engine info"""
         return pygpgme.gpgme_ctx_get_engine_info(self.wrapped)
index d962dc4..5797526 100755 (executable)
@@ -146,3 +146,38 @@ except Exception as e:
     assert e == myException
 else:
     assert False, "Expected an error, got none"
+
+
+
+# Test the status callback.
+source = core.Data("Hallo Leute\n")
+sink = core.Data()
+
+status_cb_called = False
+def status_cb(keyword, args, hook=None):
+    global status_cb_called
+    status_cb_called = True
+    assert hook == cookie
+
+c = core.Context()
+c.set_status_cb(status_cb, cookie)
+c.set_ctx_flag("full-status", "1")
+c.op_encrypt([alpha], constants.ENCRYPT_ALWAYS_TRUST, source, sink)
+assert status_cb_called
+
+# Test exceptions.
+source = core.Data("Hallo Leute\n")
+sink = core.Data()
+
+def status_cb(keyword, args):
+    raise myException
+
+c = core.Context()
+c.set_status_cb(status_cb, None)
+c.set_ctx_flag("full-status", "1")
+try:
+    c.op_encrypt([alpha], constants.ENCRYPT_ALWAYS_TRUST, source, sink)
+except Exception as e:
+    assert e == myException
+else:
+    assert False, "Expected an error, got none"