 bindings/python/libjio.c |    6 +-
 bindings/python/setup.py |    2 +-
 libjio/Makefile          |    2 +-
 libjio/check.c           |   11 ++
 libjio/trans.c           |   48 +++++--
 tests/stress/jiostress   |  364 +++++++++++++++++++++++++++++++++++++---------
 6 files changed, 344 insertions(+), 89 deletions(-)

diff --git a/bindings/python/libjio.c b/bindings/python/libjio.c
index 46b257d..b75ba98 100644
--- a/bindings/python/libjio.c
+++ b/bindings/python/libjio.c
@@ -279,7 +279,7 @@ static PyObject *jf_write(jfile_object *fp, PyObject *args)
 {
 	long rv;
 	unsigned char *buf;
-	int len;
+	ssize_t len;
 
 	if (!PyArg_ParseTuple(args, "s#:write", &buf, &len))
 		return NULL;
@@ -307,7 +307,7 @@ static PyObject *jf_pwrite(jfile_object *fp, PyObject *args)
 	long rv;
 	unsigned char *buf;
 	long long offset;
-	int len;
+	ssize_t len;
 
 	if (!PyArg_ParseTuple(args, "s#L:pwrite", &buf, &len, &offset))
 		return NULL;
@@ -659,7 +659,7 @@ It's a wrapper to jtrans_add_w().\n");
 static PyObject *jt_add_w(jtrans_object *tp, PyObject *args)
 {
 	long rv;
-	int len;
+	ssize_t len;
 	long long offset;
 	unsigned char *buf;
 
diff --git a/bindings/python/setup.py b/bindings/python/setup.py
index c594e5d..78ba462 100644
--- a/bindings/python/setup.py
+++ b/bindings/python/setup.py
@@ -20,7 +20,7 @@ libjio = Extension("libjio",
 
 setup(
 	name = 'libjio',
-	version = '1.00',
+	version = '1.01',
 	description = "A library for journaled, transactional I/O",
 	author = "Alberto Bertogli",
 	author_email = "albertito@blitiri.com.ar",
diff --git a/libjio/Makefile b/libjio/Makefile
index a32828f..c17eaae 100644
--- a/libjio/Makefile
+++ b/libjio/Makefile
@@ -60,7 +60,7 @@ endif
 
 
 # library version, used for soname and generated documentation
-LIB_VER=1.00
+LIB_VER=1.01
 LIB_SO_VER=1
 
 
diff --git a/libjio/check.c b/libjio/check.c
index f3e37e2..4afb9e3 100644
--- a/libjio/check.c
+++ b/libjio/check.c
@@ -123,6 +123,17 @@ enum jfsck_return jfsck(const char *name, const char *jdir,
 
 	fs.name = (char *) name;
 
+	/* Locking the whole file protect us from concurrent runs, but it's
+	 * not to be trusted nor assumed (lingering transactions break it): it
+	 * just helps prevent some accidents. */
+	lr = plockf(fs.fd, F_LOCKW, 0, 0);
+	if (lr == -1) {
+		/* In the future, we may want to differentiate this case from
+		 * a normal I/O error. */
+		ret = J_EIO;
+		goto exit;
+	}
+
 	if (jdir == NULL) {
 		fs.jdir = (char *) malloc(PATH_MAX);
 		if (fs.jdir == NULL) {
diff --git a/libjio/trans.c b/libjio/trans.c
index e1747a8..1f7b66f 100644
--- a/libjio/trans.c
+++ b/libjio/trans.c
@@ -80,25 +80,45 @@ void jtrans_free(struct jtrans *ts)
  * be either F_LOCKW or F_UNLOCK. Returns 0 on success, -1 on error. */
 static int lock_file_ranges(struct jtrans *ts, int mode)
 {
-	off_t lr;
-	struct operation *op;
+	unsigned int nops;
+	off_t lr, min_offset;
+	struct operation *op, *start_op;
 
 	if (ts->flags & J_NOLOCK)
 		return 0;
 
-	for (op = ts->op; op != NULL; op = op->next) {
-		if (mode == F_LOCKW) {
-			lr = plockf(ts->fs->fd, F_LOCKW, op->offset, op->len);
-			if (lr == -1)
-				goto error;
-			op->locked = 1;
-		} else if (mode == F_UNLOCK && op->locked) {
-			lr = plockf(ts->fs->fd, F_UNLOCK, op->offset,
-					op->len);
-			if (lr == -1)
-				goto error;
-			op->locked = 0;
+	/* Lock/unlock always in the same order to avoid deadlocks. We will
+	 * begin with the operation that has the smallest start offset, and go
+	 * from there.
+	 * Note that this is O(n^2), but n is usually (very) small, and we're
+	 * about to do synchronous I/O, so it's not really worrying. It has a
+	 * small optimization to help when the operations tend to be in the
+	 * right order. */
+	nops = 0;
+	min_offset = 0;
+	start_op = ts->op;
+	while (nops < ts->numops_r + ts->numops_w) {
+		for (op = start_op; op != NULL; op = op->next) {
+			if (min_offset < op->offset)
+				continue;
+			min_offset = op->offset;
+			start_op = op->next;
+
+			if (mode == F_LOCKW) {
+				lr = plockf(ts->fs->fd, F_LOCKW, op->offset, op->len);
+				if (lr == -1)
+					goto error;
+				op->locked = 1;
+			} else if (mode == F_UNLOCK && op->locked) {
+				lr = plockf(ts->fs->fd, F_UNLOCK, op->offset,
+						op->len);
+				if (lr == -1)
+					goto error;
+				op->locked = 0;
+			}
 		}
+
+		nops++;
 	}
 
 	return 0;
diff --git a/tests/stress/jiostress b/tests/stress/jiostress
index 02f65cf..8f0773e 100755
--- a/tests/stress/jiostress
+++ b/tests/stress/jiostress
@@ -9,8 +9,12 @@ failures.
 
 import sys
 import os
+import time
+import select
 import random
+import fcntl
 import traceback
+from optparse import OptionParser
 import libjio
 
 try:
@@ -43,9 +47,13 @@ def randfloat(min, max):
 class ConsistencyError (Exception):
 	pass
 
-def jfsck(fname):
+def jfsck(fname, cleanup = False):
+	flags = 0
+	if cleanup:
+		flags = libjio.J_CLEANUP
+
 	try:
-		r = libjio.jfsck(fname)
+		r = libjio.jfsck(fname, flags = flags)
 		return r
 	except IOError as e:
 		if e.args[0] == libjio.J_ENOJOURNAL:
@@ -86,6 +94,124 @@ def pread(fd, start, end):
 	return r
 
 #
+# Output handler, used to get a nice output when using multiple processes
+#
+
+class OutputHandler:
+	def __init__(self, every):
+		# fds to read from
+		self.rs = []
+
+		# we will report every this number of seconds
+		self.every = every
+
+		# how many transactions has each child processed; we use the
+		# read end of the pipe to identify them
+		self.ntrans = {}
+
+		# like self.ntrans but counts only the failed ones
+		self.nfailures = {}
+
+		# fd to write to, only relevant in the child
+		self.w = None
+
+		# p = parent, c = child
+		self.end = 'p'
+
+		# last transaction number print
+		self.last_print = 0
+
+		# time of the last print
+		self.last_print_time = 0
+
+	def prefork(self):
+		r, w = os.pipe()
+		self.rs.append(r)
+		self.ntrans[r] = 0
+		self.nfailures[r] = 0
+		self.w = w
+
+	def child(self):
+		self.end = 'c'
+		os.close(self.rs[-1])
+		self.rs = []
+
+	def parent(self):
+		os.close(self.w)
+		self.w = None
+
+	SUCCESS = bytes('1', encoding = 'ascii')
+	FAILURE = bytes('0', encoding = 'ascii')
+
+	def feed(self, success = True):
+		if success:
+			os.write(self.w, OutputHandler.SUCCESS)
+		else:
+			os.write(self.w, OutputHandler.FAILURE)
+
+	def output_loop(self):
+		while self.rs:
+			rr, rw, rx = select.select(self.rs, [], [], 1)
+			for r in rr:
+				d = os.read(r, 1)
+				if not d:
+					self.rs.remove(r)
+				else:
+					self.ntrans[r] += 1
+					if d == OutputHandler.FAILURE:
+						self.nfailures[r] += 1
+
+			self.cond_print()
+		self.print()
+		return sum(self.ntrans.values()), sum(self.nfailures.values())
+
+	def cond_print(self):
+		if time.time() - self.last_print_time >= self.every:
+			self.print()
+
+	def print(self):
+		self.last_print_time = time.time()
+		for r in sorted(self.ntrans):
+			print("%4d" % self.ntrans[r], end = ' ')
+		print()
+
+
+#
+# Lock manager, used to lock ranges between multiple processes
+#
+# We can't lock the real file because that would ruin libjio's locking, so we
+# create a new file, remove it, and use fcntl locking. Not very elegant but it
+# does the trick.
+#
+
+class VoidLockManager:
+	def __init__(self):
+		pass
+
+	def lock(self, start, end):
+		pass
+
+	def unlock(self, start, end):
+		pass
+
+class LockManager:
+	def __init__(self):
+		fname = "/tmp/js-lock-tmp." + str(os.getpid())
+		self.fd = open(fname, 'w+')
+		os.unlink(fname)
+
+	def lock(self, start, end):
+		#print(os.getpid(), '\tlock:', start, end)
+		#sys.stdout.flush()
+		fcntl.lockf(self.fd, fcntl.LOCK_EX, end - start, start)
+
+	def unlock(self, start, end):
+		#print(os.getpid(), '\tunlock:', start, end)
+		#sys.stdout.flush()
+		fcntl.lockf(self.fd, fcntl.LOCK_UN, end - start, start)
+
+
+#
 # A range of bytes inside a file, used inside the transactions
 #
 # Note it can't "remember" the fd as it may change between prepare() and
@@ -93,7 +219,7 @@ def pread(fd, start, end):
 #
 
 class Range:
-	def __init__(self, fsize, maxlen):
+	def __init__(self, fsize, maxlen, lockmgr):
 		# public
 		self.start, self.end = randfrange(fsize, maxlen)
 		self.new_data = None
@@ -103,12 +229,21 @@ class Range:
 		self.prev_data = None
 		self.new_data_ctx = None
 		self.read_buf = None
+		self.lockmgr = lockmgr
+		self.locked = False
 
 		# read an extended range so we can check we
 		# only wrote what we were supposed to
 		self.ext_start = max(0, self.start - 32)
 		self.ext_end = min(fsize, self.end + 32)
 
+	def __lt__(self, other):
+		return self.ext_start < other.ext_start
+
+	def __del__(self):
+		if self.locked:
+			self.lockmgr.unlock(self.ext_start, self.ext_end)
+
 	def overlaps(self, other):
 		if (other.ext_start <= self.ext_start <= other.ext_end) or \
 		   (other.ext_start <= self.ext_end <= other.ext_end) or \
@@ -120,6 +255,8 @@ class Range:
 	def prepare_r(self):
 		self.type = 'r'
 		self.read_buf = bytearray(self.end - self.start)
+		self.lockmgr.lock(self.ext_start, self.ext_end)
+		self.locked = True
 
 	def verify_r(self, fd):
 		real_data = pread(fd, self.start, self.end)
@@ -130,6 +267,9 @@ class Range:
 
 	def prepare_w(self, fd):
 		self.type = 'w'
+		self.lockmgr.lock(self.ext_start, self.ext_end)
+		self.locked = True
+
 		self.prev_data = pread(fd, self.ext_start, self.ext_end)
 
 		self.new_data = getbytes(self.end - self.start)
@@ -172,7 +312,7 @@ class Range:
 
 class T_base:
 	"Interface for the transaction types"
-	def __init__(self, f, jf, fsize):
+	def __init__(self, f, jf, fsize, lockmgr, do_verify):
 		pass
 
 	def prepare(self):
@@ -185,13 +325,14 @@ class T_base:
 		pass
 
 class T_jwrite (T_base):
-	def __init__(self, f, jf, fsize):
+	def __init__(self, f, jf, fsize, lockmgr, do_verify):
 		self.f = f
 		self.jf = jf
 		self.fsize = fsize
+		self.do_verify = do_verify
 
-		self.maxoplen = min(int(fsize / 256), 16 * 1024 * 1024)
-		self.range = Range(self.fsize, self.maxoplen)
+		self.maxoplen = min(int(fsize / 256), 2 * 1024 * 1024)
+		self.range = Range(self.fsize, self.maxoplen, lockmgr)
 
 	def prepare(self):
 		self.range.prepare_w(self.f)
@@ -200,23 +341,26 @@ class T_jwrite (T_base):
 		self.jf.pwrite(self.range.new_data, self.range.start)
 
 	def verify(self, write_only = False):
+		if not self.do_verify:
+			return
 		self.range.verify(self.f)
 
 class T_writeonly (T_base):
-	def __init__(self, f, jf, fsize):
+	def __init__(self, f, jf, fsize, lockmgr, do_verify):
 		self.f = f
 		self.jf = jf
 		self.fsize = fsize
+		self.do_verify = do_verify
 
 		# favour many small ops
-		self.maxoplen = 1 * 1024 * 1024
+		self.maxoplen = 512 * 1024
 		self.nops = random.randint(1, 26)
 
 		self.ranges = []
 
 		c = 0
 		while len(self.ranges) < self.nops and c < self.nops * 1.25:
-			candidate = Range(self.fsize, self.maxoplen)
+			candidate = Range(self.fsize, self.maxoplen, lockmgr)
 			safe = True
 			for r in self.ranges:
 				if candidate.overlaps(r):
@@ -226,6 +370,10 @@ class T_writeonly (T_base):
 				self.ranges.append(candidate)
 			c += 1
 
+		# sort the transactions so there's no risk of internal
+		# deadlocks via the lock manager
+		self.ranges.sort()
+
 	def prepare(self):
 		for r in self.ranges:
 			r.prepare_w(self.f)
@@ -237,6 +385,9 @@ class T_writeonly (T_base):
 		t.commit()
 
 	def verify(self, write_only = False):
+		if not self.do_verify:
+			return
+
 		try:
 			for r in self.ranges:
 				r.verify(self.f)
@@ -249,8 +400,8 @@ class T_writeonly (T_base):
 			raise
 
 class T_readwrite (T_writeonly):
-	def __init__(self, f, jf, fsize):
-		T_writeonly.__init__(self, f, jf, fsize)
+	def __init__(self, f, jf, fsize, lockmgr, do_verify):
+		T_writeonly.__init__(self, f, jf, fsize, lockmgr, do_verify)
 		self.read_ranges = []
 
 	def prepare(self):
@@ -270,6 +421,9 @@ class T_readwrite (T_writeonly):
 		t.commit()
 
 	def verify(self, write_only = False):
+		if not self.do_verify:
+			return
+
 		try:
 			for r in self.ranges:
 				if write_only and r.type == 'r':
@@ -291,12 +445,16 @@ t_list = [T_jwrite, T_writeonly, T_readwrite]
 #
 
 class Stresser:
-	def __init__(self, fname, fsize, nops, use_fi, use_as):
+	def __init__(self, fname, fsize, nops, use_fi, use_as, output,
+			lockmgr, do_verify):
 		self.fname = fname
 		self.fsize = fsize
 		self.nops = nops
 		self.use_fi = use_fi
 		self.use_as = use_as
+		self.output = output
+		self.lockmgr = lockmgr
+		self.do_verify = do_verify
 
 		jflags = 0
 		if use_as:
@@ -352,7 +510,10 @@ class Stresser:
 			# parent
 			id, status = os.waitpid(pid, 0)
 			if not os.WIFEXITED(status):
-				raise RuntimeError(status)
+				i = (status,
+					os.WIFSIGNALED(status),
+					os.WTERMSIG(status))
+				raise RuntimeError(i)
 
 			if os.WEXITSTATUS(status) != 0:
 				return False
@@ -394,31 +555,25 @@ class Stresser:
 
 	def run(self):
 		nfailures = 0
-		sys.stdout.write("  ")
 
 		for i in range(1, self.nops + 1):
-			sys.stdout.write(".")
-			if i % 10 == 0:
-				sys.stdout.write(" ")
-			if i % 50 == 0:
-				sys.stdout.write(" %d\n" % i)
-				sys.stdout.write("  ")
-			sys.stdout.flush()
-
 			trans = random.choice(t_list)(self.f, self.jf,
-					self.fsize)
+					self.fsize, self.lockmgr,
+					self.do_verify)
 
 			if self.use_fi:
 				r = self.apply_fork(trans)
 			else:
 				r = self.apply(trans)
-			if not r:
+
+			if r:
+				self.output.feed(success = True)
+			else:
+				self.output.feed(success = False)
 				nfailures += 1
 				r = self.reopen(trans)
 				trans.verify(write_only = True)
 
-		sys.stdout.write("\n")
-		sys.stdout.flush()
 		return nfailures
 
 
@@ -426,58 +581,127 @@ class Stresser:
 # Main
 #
 
-def usage():
-	print("""
-Use: jiostress <file name> <file size in Mb> [<number of operations>]
-	[--fi] [--as]
+def run_stressers(nproc, fname, fsize, nops, use_fi, use_as, output, lockmgr,
+		do_verify):
+	pids = []
+	print("Launching stress test")
+	for i in range(nproc):
+		# Calculate how many operations will this child perform. The
+		# last one will work a little more so we get exactly nops.
+		# Note we prefer to work extra in the end rather than having
+		# the last process with 0 child_nops, that's why we use int()
+		# instead of round() or ceil().
+		child_nops = int(nops / nproc)
+		if i == nproc - 1:
+			child_nops = nops - int(nops / nproc) * i
+
+		output.prefork()
+		sys.stdout.flush()
+		pid = os.fork()
+		if pid == 0:
+			# child
+			output.child()
+			s = Stresser(fname, fsize, child_nops, use_fi, use_as,
+					output, lockmgr, do_verify)
+			s.run()
+			sys.exit(0)
+		else:
+			output.parent()
+			pids.append(pid)
+
+	print("Launched stress tests")
+	totalops, nfailures = output.output_loop()
+	print("Stress test completed, waiting for children")
+	nerrors = 0
+	for pid in pids:
+		rpid, status = os.waitpid(pid, 0)
+		if os.WEXITSTATUS(status) != 0:
+			nerrors += 1
+
+	print("  %d operations" % totalops)
+	print("  %d simulated failures" % nfailures)
+	print("  %d processes ended with errors" % nerrors)
+	if nerrors:
+		return False
+	return True
 
-If the number of operations is not provided, the default (500) will be
-used.
+def main():
+	usage = "Use: %prog [options] <file name> <file size in Mb>"
+	parser = OptionParser(usage = usage)
+	parser.add_option("-n", "--nops", dest = "nops", type = "int",
+		default = 100,
+		help = "number of operations (defaults to %default)")
+	parser.add_option("-p", "--nproc", dest = "nproc", type = "int",
+		default = 1,
+		help = "number of processes (defaults to %default)")
+	parser.add_option("", "--fi", dest = "use_fi",
+		action = "store_true", default = False,
+		help = "use fault injection (conflicts with --as and -p > 1)")
+	parser.add_option("", "--as", dest = "use_as",
+		action = "store_true", default = False,
+		help = "use J_LINGER + autosync (conflicts with --fi)")
+	parser.add_option("", "--no-internal-lock",
+		dest = "use_internal_locks", action = "store_false",
+		default = True,
+		help = "do not lock internally, disables verification")
+	parser.add_option("", "--no-verify", dest = "do_verify",
+		action = "store_false", default = True,
+		help = "do not perform verifications")
+	parser.add_option("", "--keep", dest = "keep",
+		action = "store_true", default = False,
+		help = "keep the file after completing the test")
+	parser.add_option("", "--force", dest = "force",
+		action = "store_true", default = False,
+		help = "force the tests to run, even if conflicting"
+			+ " options are selected")
+
+	options, args = parser.parse_args()
+
+	if len(args) != 2:
+		parser.print_help()
+		return 1
+
+	fname = args[0]
+	try:
+		fsize = int(args[1]) * 1024 * 1024
+	except ValueError:
+		print("Error: the size of the file must be numeric")
+		return 1
 
-If the "--fi" option is passed, the test will perform fault injection. This
-option conflicts with "--as".
+	if not options.force:
+		if options.use_fi and options.use_as:
+			print("Error: --fi and --as cannot be used together")
+			return 1
 
-If the "--as" option is passed, lingering transactions will be used, along
-with the automatic syncing thread. This option conflicts with "--fi".
-""")
+		if options.use_fi and options.nproc > 1:
+			print("Error: --fi cannot be used with multiple processes")
+			return 1
 
+	if not options.use_internal_locks:
+		options.do_verify = False
 
-def main():
-	try:
-		fname = sys.argv[1]
-		fsize = int(sys.argv[2]) * 1024 * 1024
-		nops = 500
-		if len(sys.argv) >= 4 and sys.argv[3].isnumeric():
-			nops = int(sys.argv[3])
-
-		use_fi = False
-		if '--fi' in sys.argv:
-			use_fi = True
-
-		use_as = False
-		if '--as' in sys.argv:
-			use_as = True
-	except:
-		usage()
-		sys.exit(1)
-
-	if use_fi and use_as:
-		print("Error: --fi and --as cannot be used together")
-		sys.exit(1)
-
-	s = Stresser(fname, fsize, nops, use_fi, use_as)
-	print("Running stress test")
-	nfailures = s.run()
-	del s
-	print("Stress test completed")
-	print("  %d operations" % nops)
-	print("  %d simulated failures" % nfailures)
+	output = OutputHandler(every = 2)
+	if options.use_internal_locks:
+		lockmgr = LockManager()
+	else:
+		lockmgr = VoidLockManager()
+
+	success = run_stressers(options.nproc, fname, fsize, options.nops,
+			options.use_fi, options.use_as, output, lockmgr,
+			options.do_verify)
 
 	r = jfsck(fname)
 	print("Final check completed")
-	#os.unlink(fname)
+	if success and not options.keep:
+		jfsck(fname, cleanup = True)
+		os.unlink(fname)
+
+	if not success:
+		print("Test failed")
+		return 1
+	return 0
 
 
 if __name__ == '__main__':
-	main()
+	sys.exit(main())
 
