python: Fix teardown of ephemeral contexts.
authorJustus Winter <justus@g10code.com>
Fri, 17 Feb 2017 11:18:56 +0000 (12:18 +0100)
committerJustus Winter <justus@g10code.com>
Fri, 17 Feb 2017 11:18:56 +0000 (12:18 +0100)
* lang/python/tests/support.py (EphemeralContext): New function.
* lang/python/tests/t-quick-key-creation.py: Use the new function to
manage ephemeral contexts.
* lang/python/tests/t-quick-key-manipulation.py: Likewise.
* lang/python/tests/t-quick-subkey-creation.py: Likewise.
--

Previously, there was a problem with cleaning up ephemeral home
directories.  shutil.rmtree deleted the agents main socket, gpg-agent
detected that, and deleted the other sockets as well, racing
shutil.rmtree which did not cope will with that.

Fix this by asking the agent nicely to shut down.

Signed-off-by: Justus Winter <justus@g10code.com>
lang/python/tests/support.py
lang/python/tests/t-quick-key-creation.py
lang/python/tests/t-quick-key-manipulation.py
lang/python/tests/t-quick-subkey-creation.py

index ed5bf61..a381270 100644 (file)
 from __future__ import absolute_import, print_function, unicode_literals
 del absolute_import, print_function, unicode_literals
 
+import contextlib
+import shutil
 import sys
 import os
 import tempfile
+import time
 import gpg
 
 # known keys
@@ -85,5 +88,24 @@ else:
             self.path = tempfile.mkdtemp()
             return self.path
         def __exit__(self, *args):
-            import shutil
             shutil.rmtree(self.path)
+
+@contextlib.contextmanager
+def EphemeralContext():
+    with TemporaryDirectory() as tmp:
+        home = os.environ['GNUPGHOME']
+        shutil.copy(os.path.join(home, "gpg.conf"), tmp)
+        shutil.copy(os.path.join(home, "gpg-agent.conf"), tmp)
+
+        with gpg.Context(home_dir=tmp) as ctx:
+            yield ctx
+
+            # Ask the agent to quit.
+            agent_socket = os.path.join(tmp, "S.gpg-agent")
+            ctx.protocol = gpg.constants.protocol.ASSUAN
+            ctx.set_engine_info(ctx.protocol, file_name=agent_socket)
+            ctx.assuan_transact(["KILLAGENT"])
+
+            # Block until it is really gone.
+            while os.path.exists(agent_socket):
+                time.sleep(.01)
index ea63dc3..c642c5b 100755 (executable)
@@ -22,42 +22,33 @@ del absolute_import, print_function, unicode_literals
 
 import gpg
 import itertools
-import os
-import shutil
 import time
 
 import support
 
 alpha = "Alpha <alpha@invalid.example.net>"
 
-def copy_configuration(destination):
-    home = os.environ['GNUPGHOME']
-    shutil.copy(os.path.join(home, "gpg.conf"), destination)
-    shutil.copy(os.path.join(home, "gpg-agent.conf"), destination)
+with support.EphemeralContext() as ctx:
+    res = ctx.create_key(alpha)
 
-with support.TemporaryDirectory() as tmp:
-    copy_configuration(tmp)
-    with gpg.Context(home_dir=tmp) as ctx:
-        res = ctx.create_key(alpha)
+    keys = list(ctx.keylist())
+    assert len(keys) == 1, "Weird number of keys created"
 
-        keys = list(ctx.keylist())
-        assert len(keys) == 1, "Weird number of keys created"
+    key = keys[0]
+    assert key.fpr == res.fpr
+    assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
+    assert key.subkeys[0].expires > 0, "Expected primary key to expire"
 
-        key = keys[0]
-        assert key.fpr == res.fpr
-        assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
-        assert key.subkeys[0].expires > 0, "Expected primary key to expire"
-
-        # Try to create a key with the same UID
-        try:
-            ctx.create_key(alpha)
-            assert False, "Expected an error but got none"
-        except gpg.errors.GpgError as e:
-            pass
+    # Try to create a key with the same UID
+    try:
+        ctx.create_key(alpha)
+        assert False, "Expected an error but got none"
+    except gpg.errors.GpgError as e:
+        pass
 
-        # Try to create a key with the same UID, now with force!
-        res2 = ctx.create_key(alpha, force=True)
-        assert res.fpr != res2.fpr
+    # Try to create a key with the same UID, now with force!
+    res2 = ctx.create_key(alpha, force=True)
+    assert res.fpr != res2.fpr
 
 
 # From here on, we use one context, and create unique UIDs
@@ -67,85 +58,82 @@ def make_uid():
     uid_counter += 1
     return "user{0}@invalid.example.org".format(uid_counter)
 
-with support.TemporaryDirectory() as tmp:
-    copy_configuration(tmp)
-    with gpg.Context(home_dir=tmp) as ctx:
-
-        # Check gpg.constants.create.NOEXPIRE...
-        res = ctx.create_key(make_uid(), expires=False)
-        key = ctx.get_key(res.fpr, secret=True)
-        assert key.fpr == res.fpr
-        assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
-        assert key.subkeys[0].expires == 0, "Expected primary key not to expire"
-
-        t = 2 * 24 * 60 * 60
-        slack = 5 * 60
-        res = ctx.create_key(make_uid(), expires_in=t)
-        key = ctx.get_key(res.fpr, secret=True)
-        assert key.fpr == res.fpr
-        assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
-        assert abs(time.time() + t - key.subkeys[0].expires) < slack, \
-            "Primary keys expiration time is off"
-
-        # Check capabilities
-        for sign, encrypt, certify, authenticate in itertools.product([False, True],
-                                                                      [False, True],
-                                                                      [False, True],
-                                                                      [False, True]):
-            # Filter some out
-            if not (sign or encrypt or certify or authenticate):
-                # This triggers the default capabilities tested before.
-                continue
-            if (sign or encrypt or authenticate) and not certify:
-                # The primary key always certifies.
-                continue
-
-            res = ctx.create_key(make_uid(), algorithm="rsa",
-                                 sign=sign, encrypt=encrypt, certify=certify,
-                                 authenticate=authenticate)
-            key = ctx.get_key(res.fpr, secret=True)
-            assert key.fpr == res.fpr
-            assert len(key.subkeys) == 1, \
-                "Expected no subkey for non-default capabilities"
-
-            p = key.subkeys[0]
-            assert sign == p.can_sign
-            assert encrypt == p.can_encrypt
-            assert certify == p.can_certify
-            assert authenticate == p.can_authenticate
-
-        # Check algorithm
-        res = ctx.create_key(make_uid(), algorithm="rsa")
-        key = ctx.get_key(res.fpr, secret=True)
-        assert key.fpr == res.fpr
-        for k in key.subkeys:
-            assert k.pubkey_algo == 1
-
-        # Check algorithm with size
-        res = ctx.create_key(make_uid(), algorithm="rsa1024")
+with support.EphemeralContext() as ctx:
+    # Check gpg.constants.create.NOEXPIRE...
+    res = ctx.create_key(make_uid(), expires=False)
+    key = ctx.get_key(res.fpr, secret=True)
+    assert key.fpr == res.fpr
+    assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
+    assert key.subkeys[0].expires == 0, "Expected primary key not to expire"
+
+    t = 2 * 24 * 60 * 60
+    slack = 5 * 60
+    res = ctx.create_key(make_uid(), expires_in=t)
+    key = ctx.get_key(res.fpr, secret=True)
+    assert key.fpr == res.fpr
+    assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
+    assert abs(time.time() + t - key.subkeys[0].expires) < slack, \
+        "Primary keys expiration time is off"
+
+    # Check capabilities
+    for sign, encrypt, certify, authenticate in itertools.product([False, True],
+                                                                  [False, True],
+                                                                  [False, True],
+                                                                  [False, True]):
+        # Filter some out
+        if not (sign or encrypt or certify or authenticate):
+            # This triggers the default capabilities tested before.
+            continue
+        if (sign or encrypt or authenticate) and not certify:
+            # The primary key always certifies.
+            continue
+
+        res = ctx.create_key(make_uid(), algorithm="rsa",
+                             sign=sign, encrypt=encrypt, certify=certify,
+                             authenticate=authenticate)
         key = ctx.get_key(res.fpr, secret=True)
         assert key.fpr == res.fpr
-        for k in key.subkeys:
-            assert k.pubkey_algo == 1
-            assert k.length == 1024
-
-        # Check algorithm future-default
-        ctx.create_key(make_uid(), algorithm="future-default")
-
-        # Check passphrase protection
-        recipient = make_uid()
-        passphrase = "streng geheim"
-        res = ctx.create_key(recipient, passphrase=passphrase)
-        ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)])
-
-        cb_called = False
-        def cb(*args):
-            global cb_called
-            cb_called = True
-            return passphrase
-        ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK
-        ctx.set_passphrase_cb(cb)
-
-        plaintext, _, _ = ctx.decrypt(ciphertext)
-        assert plaintext == b"hello there"
-        assert cb_called
+        assert len(key.subkeys) == 1, \
+            "Expected no subkey for non-default capabilities"
+
+        p = key.subkeys[0]
+        assert sign == p.can_sign
+        assert encrypt == p.can_encrypt
+        assert certify == p.can_certify
+        assert authenticate == p.can_authenticate
+
+    # Check algorithm
+    res = ctx.create_key(make_uid(), algorithm="rsa")
+    key = ctx.get_key(res.fpr, secret=True)
+    assert key.fpr == res.fpr
+    for k in key.subkeys:
+        assert k.pubkey_algo == 1
+
+    # Check algorithm with size
+    res = ctx.create_key(make_uid(), algorithm="rsa1024")
+    key = ctx.get_key(res.fpr, secret=True)
+    assert key.fpr == res.fpr
+    for k in key.subkeys:
+        assert k.pubkey_algo == 1
+        assert k.length == 1024
+
+    # Check algorithm future-default
+    ctx.create_key(make_uid(), algorithm="future-default")
+
+    # Check passphrase protection
+    recipient = make_uid()
+    passphrase = "streng geheim"
+    res = ctx.create_key(recipient, passphrase=passphrase)
+    ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)])
+
+    cb_called = False
+    def cb(*args):
+        global cb_called
+        cb_called = True
+        return passphrase
+    ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK
+    ctx.set_passphrase_cb(cb)
+
+    plaintext, _, _ = ctx.decrypt(ciphertext)
+    assert plaintext == b"hello there"
+    assert cb_called
index 62c395a..12c18ce 100755 (executable)
@@ -21,83 +21,72 @@ from __future__ import absolute_import, print_function, unicode_literals
 del absolute_import, print_function, unicode_literals
 
 import gpg
-import itertools
-import os
-import shutil
-import time
 
 import support
 
 alpha = "Alpha <alpha@invalid.example.net>"
 bravo = "Bravo <bravo@invalid.example.net>"
 
-def copy_configuration(destination):
-    home = os.environ['GNUPGHOME']
-    shutil.copy(os.path.join(home, "gpg.conf"), destination)
-    shutil.copy(os.path.join(home, "gpg-agent.conf"), destination)
+with support.EphemeralContext() as ctx:
+    res = ctx.create_key(alpha, certify=True)
+    key = ctx.get_key(res.fpr)
+    assert len(key.subkeys) == 1, "Expected one primary key and no subkeys"
+    assert len(key.uids) == 1, "Expected exactly one UID"
 
-with support.TemporaryDirectory() as tmp:
-    copy_configuration(tmp)
-    with gpg.Context(home_dir=tmp) as ctx:
-        res = ctx.create_key(alpha, certify=True)
+    def get_uid(uid):
         key = ctx.get_key(res.fpr)
-        assert len(key.subkeys) == 1, "Expected one primary key and no subkeys"
-        assert len(key.uids) == 1, "Expected exactly one UID"
+        for u in key.uids:
+            if u.uid == uid:
+                return u
+        return None
 
-        def get_uid(uid):
-            key = ctx.get_key(res.fpr)
-            for u in key.uids:
-                if u.uid == uid:
-                    return u
-            return None
+    # sanity check
+    uid = get_uid(alpha)
+    assert uid, "UID alpha not found"
+    assert uid.revoked == 0
 
-        # sanity check
-        uid = get_uid(alpha)
-        assert uid, "UID alpha not found"
-        assert uid.revoked == 0
+    # add bravo
+    ctx.key_add_uid(key, bravo)
+    uid = get_uid(bravo)
+    assert uid, "UID bravo not found"
+    assert uid.revoked == 0
 
-        # add bravo
-        ctx.key_add_uid(key, bravo)
-        uid = get_uid(bravo)
-        assert uid, "UID bravo not found"
-        assert uid.revoked == 0
+    # revoke alpha
+    ctx.key_revoke_uid(key, alpha)
+    uid = get_uid(alpha)
+    assert uid, "UID alpha not found"
+    assert uid.revoked == 1
+    uid = get_uid(bravo)
+    assert uid, "UID bravo not found"
+    assert uid.revoked == 0
 
-        # revoke alpha
+    # try to revoke the last UID
+    try:
         ctx.key_revoke_uid(key, alpha)
-        uid = get_uid(alpha)
-        assert uid, "UID alpha not found"
-        assert uid.revoked == 1
-        uid = get_uid(bravo)
-        assert uid, "UID bravo not found"
-        assert uid.revoked == 0
-
-        # try to revoke the last UID
-        try:
-            ctx.key_revoke_uid(key, alpha)
-            # IMHO this should fail.  issue2961.
-            # assert False, "Expected an error but got none"
-        except gpg.errors.GpgError:
-            pass
+        # IMHO this should fail.  issue2961.
+        # assert False, "Expected an error but got none"
+    except gpg.errors.GpgError:
+        pass
 
-        # Everything should be the same
-        uid = get_uid(alpha)
-        assert uid, "UID alpha not found"
-        assert uid.revoked == 1
-        uid = get_uid(bravo)
-        assert uid, "UID bravo not found"
-        assert uid.revoked == 0
+    # Everything should be the same
+    uid = get_uid(alpha)
+    assert uid, "UID alpha not found"
+    assert uid.revoked == 1
+    uid = get_uid(bravo)
+    assert uid, "UID bravo not found"
+    assert uid.revoked == 0
 
-        # try to revoke a non-existent UID
-        try:
-            ctx.key_revoke_uid(key, "i dont exist")
-            # IMHO this should fail.  issue2963.
-            # assert False, "Expected an error but got none"
-        except gpg.errors.GpgError:
-            pass
+    # try to revoke a non-existent UID
+    try:
+        ctx.key_revoke_uid(key, "i dont exist")
+        # IMHO this should fail.  issue2963.
+        # assert False, "Expected an error but got none"
+    except gpg.errors.GpgError:
+        pass
 
-        # try to add an pre-existent UID
-        try:
-            ctx.key_add_uid(key, bravo)
-            assert False, "Expected an error but got none"
-        except gpg.errors.GpgError:
-            pass
+    # try to add an pre-existent UID
+    try:
+        ctx.key_add_uid(key, bravo)
+        assert False, "Expected an error but got none"
+    except gpg.errors.GpgError:
+        pass
index 0d9f71f..ad4f35c 100755 (executable)
@@ -22,8 +22,6 @@ del absolute_import, print_function, unicode_literals
 
 import gpg
 import itertools
-import os
-import shutil
 import time
 
 import support
@@ -31,91 +29,84 @@ import support
 alpha = "Alpha <alpha@invalid.example.net>"
 bravo = "Bravo <bravo@invalid.example.net>"
 
-def copy_configuration(destination):
-    home = os.environ['GNUPGHOME']
-    shutil.copy(os.path.join(home, "gpg.conf"), destination)
-    shutil.copy(os.path.join(home, "gpg-agent.conf"), destination)
-
-with support.TemporaryDirectory() as tmp:
-    copy_configuration(tmp)
-    with gpg.Context(home_dir=tmp) as ctx:
-        res = ctx.create_key(alpha, certify=True)
-        keys = list(ctx.keylist())
-        assert len(keys) == 1, "Weird number of keys created"
-        key = keys[0]
-        assert key.fpr == res.fpr
-        assert len(key.subkeys) == 1, "Expected one primary key and no subkeys"
-
-        def get_subkey(fpr):
-            k = ctx.get_key(fpr)
-            for sk in k.subkeys:
-                if sk.fpr == fpr:
-                    return sk
-            return None
-
-        # Check gpg.constants.create.NOEXPIRE...
-        res = ctx.create_subkey(key, expires=False)
+with support.EphemeralContext() as ctx:
+    res = ctx.create_key(alpha, certify=True)
+    keys = list(ctx.keylist())
+    assert len(keys) == 1, "Weird number of keys created"
+    key = keys[0]
+    assert key.fpr == res.fpr
+    assert len(key.subkeys) == 1, "Expected one primary key and no subkeys"
+
+    def get_subkey(fpr):
+        k = ctx.get_key(fpr)
+        for sk in k.subkeys:
+            if sk.fpr == fpr:
+                return sk
+        return None
+
+    # Check gpg.constants.create.NOEXPIRE...
+    res = ctx.create_subkey(key, expires=False)
+    subkey = get_subkey(res.fpr)
+    assert subkey.expires == 0, "Expected subkey not to expire"
+    assert subkey.can_encrypt, \
+        "Default subkey capabilities do not include encryption"
+
+    t = 2 * 24 * 60 * 60
+    slack = 5 * 60
+    res = ctx.create_subkey(key, expires_in=t)
+    subkey = get_subkey(res.fpr)
+    assert abs(time.time() + t - subkey.expires) < slack, \
+        "subkeys expiration time is off"
+
+    # Check capabilities
+    for sign, encrypt, authenticate in itertools.product([False, True],
+                                                         [False, True],
+                                                         [False, True]):
+        # Filter some out
+        if not (sign or encrypt or authenticate):
+            # This triggers the default capabilities tested before.
+            continue
+
+        res = ctx.create_subkey(key, sign=sign, encrypt=encrypt,
+                                authenticate=authenticate)
         subkey = get_subkey(res.fpr)
-        assert subkey.expires == 0, "Expected subkey not to expire"
-        assert subkey.can_encrypt, \
-            "Default subkey capabilities do not include encryption"
-
-        t = 2 * 24 * 60 * 60
-        slack = 5 * 60
-        res = ctx.create_subkey(key, expires_in=t)
-        subkey = get_subkey(res.fpr)
-        assert abs(time.time() + t - subkey.expires) < slack, \
-            "subkeys expiration time is off"
-
-        # Check capabilities
-        for sign, encrypt, authenticate in itertools.product([False, True],
-                                                             [False, True],
-                                                             [False, True]):
-            # Filter some out
-            if not (sign or encrypt or authenticate):
-                # This triggers the default capabilities tested before.
-                continue
-
-            res = ctx.create_subkey(key, sign=sign, encrypt=encrypt,
-                                    authenticate=authenticate)
-            subkey = get_subkey(res.fpr)
-            assert sign == subkey.can_sign
-            assert encrypt == subkey.can_encrypt
-            assert authenticate == subkey.can_authenticate
-
-        # Check algorithm
-        res = ctx.create_subkey(key, algorithm="rsa")
-        subkey = get_subkey(res.fpr)
-        assert subkey.pubkey_algo == 1
-
-        # Check algorithm with size
-        res = ctx.create_subkey(key, algorithm="rsa1024")
-        subkey = get_subkey(res.fpr)
-        assert subkey.pubkey_algo == 1
-        assert subkey.length == 1024
-
-        # Check algorithm future-default
-        ctx.create_subkey(key, algorithm="future-default")
-
-        # Check passphrase protection.  For this we create a new key
-        # so that we have a key with just one encryption subkey.
-        bravo_res = ctx.create_key(bravo, certify=True)
-        bravo_key = ctx.get_key(bravo_res.fpr)
-        assert len(bravo_key.subkeys) == 1, "Expected one primary key and no subkeys"
-
-        passphrase = "streng geheim"
-        res = ctx.create_subkey(bravo_key, passphrase=passphrase)
-        ciphertext, _, _ = ctx.encrypt(b"hello there",
-                                       recipients=[ctx.get_key(bravo_res.fpr)])
-
-        cb_called = False
-        def cb(*args):
-            global cb_called
-            cb_called = True
-            return passphrase
-        ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK
-        ctx.set_passphrase_cb(cb)
-
-        plaintext, _, _ = ctx.decrypt(ciphertext)
-        assert plaintext == b"hello there"
-        assert cb_called
+        assert sign == subkey.can_sign
+        assert encrypt == subkey.can_encrypt
+        assert authenticate == subkey.can_authenticate
+
+    # Check algorithm
+    res = ctx.create_subkey(key, algorithm="rsa")
+    subkey = get_subkey(res.fpr)
+    assert subkey.pubkey_algo == 1
+
+    # Check algorithm with size
+    res = ctx.create_subkey(key, algorithm="rsa1024")
+    subkey = get_subkey(res.fpr)
+    assert subkey.pubkey_algo == 1
+    assert subkey.length == 1024
+
+    # Check algorithm future-default
+    ctx.create_subkey(key, algorithm="future-default")
+
+    # Check passphrase protection.  For this we create a new key
+    # so that we have a key with just one encryption subkey.
+    bravo_res = ctx.create_key(bravo, certify=True)
+    bravo_key = ctx.get_key(bravo_res.fpr)
+    assert len(bravo_key.subkeys) == 1, "Expected one primary key and no subkeys"
+
+    passphrase = "streng geheim"
+    res = ctx.create_subkey(bravo_key, passphrase=passphrase)
+    ciphertext, _, _ = ctx.encrypt(b"hello there",
+                                   recipients=[ctx.get_key(bravo_res.fpr)])
+
+    cb_called = False
+    def cb(*args):
+        global cb_called
+        cb_called = True
+        return passphrase
+    ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK
+    ctx.set_passphrase_cb(cb)
+
+    plaintext, _, _ = ctx.decrypt(ciphertext)
+    assert plaintext == b"hello there"
+    assert cb_called