On Mon, May 09, 2016 at 04:43:44PM +0200, Michal Privoznik wrote:
On 05.05.2016 18:10, Daniel P. Berrange wrote:
> On Thu, Apr 28, 2016 at 12:04:47PM +0200, Michal Privoznik wrote:
>> <snip/>
>
> While it looks ok-ish in the context of the RecvAll function,
> because you have skipFunc + recvFunc both being invoked
> asynchronously, this design feels a quite odd when used
> in combination with the plain virStreamRecv().
>
> Just for a minute lets consider
>
> st = virStreamNew(conn, 0);
> virStreamRegisterSkip(st, skipFunc, &fd);
> virStorageVolDownload(st, ...);
>
> while (1) {
> char buf[4096];
> virStreamRecv(st, buf, sizeof(buf));
>
> ..do something with buf..
> }
>
> I think it is quite unpleasant to have the skipFunc be called
> out of band to the code dealing with the Recv.
>
>
>
> I think it is preferrable that we have an explicit synchronous
> API for consuming the hole.
>
> The same really applies from POV of the upload side - I think
> we should be able to synchronously push the hole in, rather
> than having the asynchronous InData callback.
>
> IIUC, the reason you've chosen this async callback approach
> is because the virStream{Send,Recv,SendAll,RecvAll} methods
> are not extensible to cope with holes.
>
>
> I'm thinking though that we'd get a more attractive public API
> by creating hole friendly versions of virStream{Send,Recv,etc}
> instead of going the extra callback route.
>
>
> The download side could we made to work if we had a flag for
> virStreamRecv that instructed it to only read data up to the
> next hole, and defined an explicit error code to use to indicate
> that we've hit a hole.
>
> This would be used thus:
>
> while (1) {
> char buf[4096];
>
> int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE);
> if (ret == -1) {
> virErrorPtr err = virGetLastError();
> if (err.code == VIR_ERR_STREAM_HOLE) {
> ret = virStreamHoleSize(st, buf);
Could this be made even more friendlier by returning a special value
(say -2) in case of a hole?
> ...seek ret bytes in target...
> } else {
> return -1;
> }
> } else {
> ...write buf to target...
> }
> }
>
Okay, so imagine we have STREAM_SKIP packet incoming what should happen
if it is processed by bare virStreamRead()? IOW user requests sparse
streams but sticks to calling old virStreamRecv().
Given that the app has to pass in the flag VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM
(or similar), we could declare that it is an error to use the plain virStreamRecv
when you have already asked for a sparse stream.
Alternatively, we could just make virStreamRecv give back a buffer
full of zeros :-)
One thing that I'm worried here about is that @flags of
virStreamRecvFlags() would not be transferred to the sender side as one
might expect.
Yep, that is true, but then everything about the stream APIs is
a bit different from the rest of the API.
> To make this work with virStreamRecvAll, we would need to add
> a virStreamSparseRecvAll() which had two callbacks eg
>
> typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, unsigned long long len):
>
> virStreamSparseRecvAll(virStreamPtr stream,
> virStreamSinkFunc handler,
> virStreamSinkHoleFunc holeHandler,
> void *opaque) {
> while (1) {
> char buf[4096];
>
> int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE);
> if (ret == -1) {
> virErrorPtr err = virGetLastError();
> if (err.code == VIR_ERR_STREAM_HOLE) {
> ret = virStreamHoleSize(st, buf);
> holeHandler(st, ret);
> } else {
> return -1;
> }
> } else {
> handler(st, buf, ret);
> }
> }
> }
>
>
> Now considering the upload side of things, the virStreamSend
> function doesn't actually need cahnges, though it could do
> with a flags parameter for best practice. We just need the
> virStreamSkip() function you already add.
>
> while (1) {
> char buf[4096];
> if (..in hole...) {
> ..get hole size...
> virStreamSkip(st, len);
> } else {
> ...read N bytes...
> virStreamSend(st, buf, len);
> }
> }
>
>
> The SendAll method is again more complicated as the callback we pass
> into it is insufficient to figure out if we have holes. We could
> add a virStreamSparseSendAll() which had two callbacks again:
>
> typedef int (*virStreamSourceHoleFunc)(holeHandler);
>
> This returns the length of the current hole, or 0 if not at
> a hole.
>
> virStreamSparseSendAll(virStreamPtr stream,
> virStreamSourceFunc handler,
> virStreamSourceHoleFunc holeHandler,
> void *opaque)
> while (1) {
> char buf[4096];
> int ret = holeHandler(st);
> if (ret > 0) {
> virStreamSkip(st, ret);
> } else {
> ret = handler(st, buf, sizeof(buf);
> virStreamSend(st, buf, ret);
> }
> }
> }
>
>
> So we would avoid the virStreamInData and virStreamRegisterInData
> and virStreamRegisterSkip and virStreamSkipCallback methods
Okay. Makes sense. Let me see if I could reuse my RPC implementation,
because that had been the hardest part to implement of them all.
Based on my understanding of your RPC design, I think it should still be
ok as is.
Regards,
Daniel
--
|:
http://berrange.com -o-
http://www.flickr.com/photos/dberrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|:
http://entangle-photo.org -o-
http://live.gnome.org/gtk-vnc :|